Merge pull request #2087 from influxdata/flux-staging

Update flux to v0.12.0
pull/10616/head
Nathaniel Cook 2019-01-03 15:11:10 -07:00 committed by GitHub
commit 52a6e7a69d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
323 changed files with 527 additions and 691 deletions

View File

@ -0,0 +1,447 @@
package launcher
import (
"context"
"fmt"
"io"
"net"
nethttp "net/http"
_ "net/http/pprof"
"os"
"path/filepath"
"sync"
"time"
"github.com/influxdata/flux/control"
"github.com/influxdata/flux/execute"
"github.com/influxdata/platform"
"github.com/influxdata/platform/bolt"
"github.com/influxdata/platform/chronograf/server"
"github.com/influxdata/platform/gather"
"github.com/influxdata/platform/http"
"github.com/influxdata/platform/internal/fs"
"github.com/influxdata/platform/kit/cli"
"github.com/influxdata/platform/kit/prom"
influxlogger "github.com/influxdata/platform/logger"
"github.com/influxdata/platform/nats"
"github.com/influxdata/platform/query"
_ "github.com/influxdata/platform/query/builtin"
pcontrol "github.com/influxdata/platform/query/control"
"github.com/influxdata/platform/snowflake"
"github.com/influxdata/platform/source"
"github.com/influxdata/platform/storage"
"github.com/influxdata/platform/storage/readservice"
"github.com/influxdata/platform/task"
taskbackend "github.com/influxdata/platform/task/backend"
taskbolt "github.com/influxdata/platform/task/backend/bolt"
"github.com/influxdata/platform/task/backend/coordinator"
taskexecutor "github.com/influxdata/platform/task/backend/executor"
_ "github.com/influxdata/platform/tsdb/tsi1"
_ "github.com/influxdata/platform/tsdb/tsm1"
"github.com/influxdata/platform/vault"
pzap "github.com/influxdata/platform/zap"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// Launcher represents the main program execution.
type Launcher struct {
wg sync.WaitGroup
cancel func()
running bool
logLevel string
httpBindAddress string
boltPath string
natsPath string
developerMode bool
enginePath string
secretStore string
boltClient *bolt.Client
engine *storage.Engine
queryController *pcontrol.Controller
httpPort int
httpServer *nethttp.Server
natsServer *nats.Server
scheduler *taskbackend.TickScheduler
logger *zap.Logger
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
}
// NewLauncher returns a new instance of Launcher connected to standard in/out/err.
func NewLauncher() *Launcher {
return &Launcher{
Stdin: os.Stdin,
Stdout: os.Stdout,
Stderr: os.Stderr,
}
}
func (m *Launcher) Running() bool {
return m.running
}
// URL returns the URL to connect to the HTTP server.
func (m *Launcher) URL() string {
return fmt.Sprintf("http://127.0.0.1:%d", m.httpPort)
}
// Shutdown shuts down the HTTP server and waits for all services to clean up.
func (m *Launcher) Shutdown(ctx context.Context) {
m.httpServer.Shutdown(ctx)
m.logger.Info("Stopping", zap.String("service", "task"))
m.scheduler.Stop()
m.logger.Info("Stopping", zap.String("service", "nats"))
m.natsServer.Close()
m.logger.Info("Stopping", zap.String("service", "bolt"))
if err := m.boltClient.Close(); err != nil {
m.logger.Info("failed closing bolt", zap.Error(err))
}
m.logger.Info("Stopping", zap.String("service", "query"))
if err := m.queryController.Shutdown(ctx); err != nil {
m.logger.Info("Failed closing query service", zap.Error(err))
}
m.logger.Info("Stopping", zap.String("service", "storage-engine"))
if err := m.engine.Close(); err != nil {
m.logger.Error("failed to close engine", zap.Error(err))
}
m.wg.Wait()
m.logger.Sync()
}
// Cancel executes the context cancel on the program. Used for testing.
func (m *Launcher) Cancel() { m.cancel() }
// Run executes the program with the given CLI arguments.
func (m *Launcher) Run(ctx context.Context, args ...string) error {
dir, err := fs.InfluxDir()
if err != nil {
return fmt.Errorf("failed to determine influx directory: %v", err)
}
prog := &cli.Program{
Name: "influxd",
Run: func() error { return m.run(ctx) },
Opts: []cli.Opt{
{
DestP: &m.logLevel,
Flag: "log-level",
Default: "info",
Desc: "supported log levels are debug, info, and error",
},
{
DestP: &m.httpBindAddress,
Flag: "http-bind-address",
Default: ":9999",
Desc: "bind address for the REST HTTP API",
},
{
DestP: &m.boltPath,
Flag: "bolt-path",
Default: filepath.Join(dir, "influxd.bolt"),
Desc: "path to boltdb database",
},
{
DestP: &m.developerMode,
Flag: "developer-mode",
Default: false,
Desc: "serve assets from the local filesystem in developer mode",
},
{
DestP: &m.natsPath,
Flag: "nats-path",
Default: filepath.Join(dir, "nats"),
Desc: "path to NATS queue for scraping tasks",
},
{
DestP: &m.enginePath,
Flag: "engine-path",
Default: filepath.Join(dir, "engine"),
Desc: "path to persistent engine files",
},
{
DestP: &m.secretStore,
Flag: "secret-store",
Default: "bolt",
Desc: "data store for secrets (bolt or vault)",
},
},
}
cmd := cli.NewCommand(prog)
cmd.SetArgs(args)
return cmd.Execute()
}
func (m *Launcher) run(ctx context.Context) (err error) {
m.running = true
ctx, m.cancel = context.WithCancel(ctx)
var lvl zapcore.Level
if err := lvl.Set(m.logLevel); err != nil {
return fmt.Errorf("unknown log level; supported levels are debug, info, and error")
}
// Create top level logger
logconf := &influxlogger.Config{
Format: "auto",
Level: lvl,
}
m.logger, err = logconf.New(m.Stdout)
if err != nil {
return err
}
// set tracing
tracer := new(pzap.Tracer)
tracer.Logger = m.logger
tracer.IDGenerator = snowflake.NewIDGenerator()
opentracing.SetGlobalTracer(tracer)
reg := prom.NewRegistry()
reg.MustRegister(prometheus.NewGoCollector())
reg.WithLogger(m.logger)
m.boltClient = bolt.NewClient()
m.boltClient.Path = m.boltPath
m.boltClient.WithLogger(m.logger.With(zap.String("service", "bolt")))
if err := m.boltClient.Open(ctx); err != nil {
m.logger.Error("failed opening bolt", zap.Error(err))
return err
}
var (
orgSvc platform.OrganizationService = m.boltClient
authSvc platform.AuthorizationService = m.boltClient
userSvc platform.UserService = m.boltClient
viewSvc platform.ViewService = m.boltClient
macroSvc platform.MacroService = m.boltClient
bucketSvc platform.BucketService = m.boltClient
sourceSvc platform.SourceService = m.boltClient
sessionSvc platform.SessionService = m.boltClient
basicAuthSvc platform.BasicAuthService = m.boltClient
dashboardSvc platform.DashboardService = m.boltClient
dashboardLogSvc platform.DashboardOperationLogService = m.boltClient
userLogSvc platform.UserOperationLogService = m.boltClient
bucketLogSvc platform.BucketOperationLogService = m.boltClient
orgLogSvc platform.OrganizationOperationLogService = m.boltClient
onboardingSvc platform.OnboardingService = m.boltClient
scraperTargetSvc platform.ScraperTargetStoreService = m.boltClient
telegrafSvc platform.TelegrafConfigStore = m.boltClient
userResourceSvc platform.UserResourceMappingService = m.boltClient
labelSvc platform.LabelService = m.boltClient
secretSvc platform.SecretService = m.boltClient
lookupSvc platform.LookupService = m.boltClient
)
switch m.secretStore {
case "bolt":
// If it is bolt, then we already set it above.
case "vault":
// The vault secret service is configured using the standard vault environment variables.
// https://www.vaultproject.io/docs/commands/index.html#environment-variables
svc, err := vault.NewSecretService()
if err != nil {
m.logger.Error("failed initalizing vault secret service", zap.Error(err))
return err
}
secretSvc = svc
default:
err := fmt.Errorf("unknown secret service %q, expected \"bolt\" or \"vault\"", m.secretStore)
m.logger.Error("failed setting secret service", zap.Error(err))
return err
}
chronografSvc, err := server.NewServiceV2(ctx, m.boltClient.DB())
if err != nil {
m.logger.Error("failed creating chronograf service", zap.Error(err))
return err
}
var pointsWriter storage.PointsWriter
{
m.engine = storage.NewEngine(m.enginePath, storage.NewConfig(), storage.WithRetentionEnforcer(bucketSvc))
m.engine.WithLogger(m.logger)
if err := m.engine.Open(); err != nil {
m.logger.Error("failed to open engine", zap.Error(err))
return err
}
// The Engine's metrics must be registered after it opens.
reg.MustRegister(m.engine.PrometheusCollectors()...)
pointsWriter = m.engine
const (
concurrencyQuota = 10
memoryBytesQuota = 1e6
)
cc := control.Config{
ExecutorDependencies: make(execute.Dependencies),
ConcurrencyQuota: concurrencyQuota,
MemoryBytesQuota: int64(memoryBytesQuota),
Logger: m.logger.With(zap.String("service", "storage-reads")),
}
if err := readservice.AddControllerConfigDependencies(
&cc, m.engine, bucketSvc, orgSvc,
); err != nil {
m.logger.Error("Failed to configure query controller dependencies", zap.Error(err))
return err
}
m.queryController = pcontrol.New(cc)
reg.MustRegister(m.queryController.PrometheusCollectors()...)
}
var storageQueryService query.ProxyQueryService = readservice.NewProxyQueryService(m.queryController)
var taskSvc platform.TaskService
{
boltStore, err := taskbolt.New(m.boltClient.DB(), "tasks")
if err != nil {
m.logger.Error("failed opening task bolt", zap.Error(err))
return err
}
executor := taskexecutor.NewAsyncQueryServiceExecutor(m.logger.With(zap.String("service", "task-executor")), m.queryController, boltStore)
lw := taskbackend.NewPointLogWriter(pointsWriter)
m.scheduler = taskbackend.NewScheduler(boltStore, executor, lw, time.Now().UTC().Unix(), taskbackend.WithTicker(ctx, 100*time.Millisecond), taskbackend.WithLogger(m.logger))
m.scheduler.Start(ctx)
reg.MustRegister(m.scheduler.PrometheusCollectors()...)
queryService := query.QueryServiceBridge{AsyncQueryService: m.queryController}
lr := taskbackend.NewQueryLogReader(queryService)
taskSvc = task.PlatformAdapter(coordinator.New(m.logger.With(zap.String("service", "task-coordinator")), m.scheduler, boltStore), lr, m.scheduler)
taskSvc = task.NewValidator(taskSvc, bucketSvc)
}
// NATS streaming server
m.natsServer = nats.NewServer(nats.Config{FilestoreDir: m.natsPath})
if err := m.natsServer.Open(); err != nil {
m.logger.Error("failed to start nats streaming server", zap.Error(err))
return err
}
publisher := nats.NewAsyncPublisher("nats-publisher")
if err := publisher.Open(); err != nil {
m.logger.Error("failed to connect to streaming server", zap.Error(err))
return err
}
// TODO(jm): this is an example of using a subscriber to consume from the channel. It should be removed.
subscriber := nats.NewQueueSubscriber("nats-subscriber")
if err := subscriber.Open(); err != nil {
m.logger.Error("failed to connect to streaming server", zap.Error(err))
return err
}
scraperScheduler, err := gather.NewScheduler(10, m.logger, scraperTargetSvc, publisher, subscriber, 0, 0)
if err != nil {
m.logger.Error("failed to create scraper subscriber", zap.Error(err))
return err
}
m.wg.Add(1)
go func(logger *zap.Logger) {
defer m.wg.Done()
logger = logger.With(zap.String("service", "scraper"))
if err := scraperScheduler.Run(ctx); err != nil {
logger.Error("failed scraper service", zap.Error(err))
}
logger.Info("Stopping")
}(m.logger)
m.httpServer = &nethttp.Server{
Addr: m.httpBindAddress,
}
handlerConfig := &http.APIBackend{
DeveloperMode: m.developerMode,
Logger: m.logger,
NewBucketService: source.NewBucketService,
NewQueryService: source.NewQueryService,
PointsWriter: pointsWriter,
AuthorizationService: authSvc,
BucketService: bucketSvc,
SessionService: sessionSvc,
UserService: userSvc,
OrganizationService: orgSvc,
UserResourceMappingService: userResourceSvc,
LabelService: labelSvc,
DashboardService: dashboardSvc,
DashboardOperationLogService: dashboardLogSvc,
BucketOperationLogService: bucketLogSvc,
UserOperationLogService: userLogSvc,
OrganizationOperationLogService: orgLogSvc,
ViewService: viewSvc,
SourceService: sourceSvc,
MacroService: macroSvc,
BasicAuthService: basicAuthSvc,
OnboardingService: onboardingSvc,
ProxyQueryService: storageQueryService,
TaskService: taskSvc,
TelegrafService: telegrafSvc,
ScraperTargetStoreService: scraperTargetSvc,
ChronografService: chronografSvc,
SecretService: secretSvc,
LookupService: lookupSvc,
}
// HTTP server
httpLogger := m.logger.With(zap.String("service", "http"))
platformHandler := http.NewPlatformHandler(handlerConfig)
reg.MustRegister(platformHandler.PrometheusCollectors()...)
h := http.NewHandlerFromRegistry("platform", reg)
h.Handler = platformHandler
h.Logger = httpLogger
h.Tracer = opentracing.GlobalTracer()
m.httpServer.Handler = h
ln, err := net.Listen("tcp", m.httpBindAddress)
if err != nil {
httpLogger.Error("failed http listener", zap.Error(err))
httpLogger.Info("Stopping")
return err
}
if addr, ok := ln.Addr().(*net.TCPAddr); ok {
m.httpPort = addr.Port
}
m.wg.Add(1)
go func(logger *zap.Logger) {
defer m.wg.Done()
logger.Info("Listening", zap.String("transport", "http"), zap.String("addr", m.httpBindAddress), zap.Int("port", m.httpPort))
if err := m.httpServer.Serve(ln); err != nethttp.ErrServerClosed {
logger.Error("failed http service", zap.Error(err))
}
logger.Info("Stopping")
}(httpLogger)
return nil
}

View File

@ -1,4 +1,4 @@
package main_test
package launcher_test
import (
"bytes"
@ -12,23 +12,24 @@ import (
"strings"
"testing"
"github.com/influxdata/platform/cmd/influxd/launcher"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/platform"
"github.com/influxdata/platform/cmd/influxd"
"github.com/influxdata/platform/http"
)
// Default context.
var ctx = context.Background()
func TestMain_Setup(t *testing.T) {
m := NewMain()
if err := m.Run(ctx); err != nil {
func TestLauncher_Setup(t *testing.T) {
l := NewLauncher()
if err := l.Run(ctx); err != nil {
t.Fatal(err)
}
defer m.Shutdown(ctx)
defer l.Shutdown(ctx)
svc := &http.SetupService{Addr: m.URL()}
svc := &http.SetupService{Addr: l.URL()}
if results, err := svc.Generate(ctx, &platform.OnboardingRequest{
User: "USER",
Password: "PASSWORD",
@ -47,13 +48,13 @@ func TestMain_Setup(t *testing.T) {
}
}
func TestMain_WriteAndQuery(t *testing.T) {
m := RunMainOrFail(t, ctx)
m.SetupOrFail(t)
defer m.ShutdownOrFail(t, ctx)
func TestLauncher_WriteAndQuery(t *testing.T) {
l := RunLauncherOrFail(t, ctx)
l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx)
// Execute single write against the server.
resp, err := nethttp.DefaultClient.Do(m.MustNewHTTPRequest("POST", fmt.Sprintf("/api/v2/write?org=%s&bucket=%s", m.Org.ID, m.Bucket.ID), `m,k=v f=100i 946684800000000000`))
resp, err := nethttp.DefaultClient.Do(l.MustNewHTTPRequest("POST", fmt.Sprintf("/api/v2/write?org=%s&bucket=%s", l.Org.ID, l.Bucket.ID), `m,k=v f=100i 946684800000000000`))
if err != nil {
t.Fatal(err)
}
@ -77,19 +78,19 @@ func TestMain_WriteAndQuery(t *testing.T) {
`,result,table,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,100,f,m,v` + "\r\n\r\n"
var buf bytes.Buffer
req := (http.QueryRequest{Query: qs, Org: m.Org}).WithDefaults()
req := (http.QueryRequest{Query: qs, Org: l.Org}).WithDefaults()
if preq, err := req.ProxyRequest(); err != nil {
t.Fatal(err)
} else if _, err := m.FluxService().Query(ctx, &buf, preq); err != nil {
} else if _, err := l.FluxService().Query(ctx, &buf, preq); err != nil {
t.Fatal(err)
} else if diff := cmp.Diff(buf.String(), exp); diff != "" {
t.Fatal(diff)
}
}
// Main is a test wrapper for main.Main.
type Main struct {
*main.Main
// Launcher is a test wrapper for launcher.Launcher.
type Launcher struct {
*launcher.Launcher
// Root temporary directory for all data.
Path string
@ -106,63 +107,63 @@ type Main struct {
Stderr bytes.Buffer
}
// NewMain returns a new instance of Main.
func NewMain() *Main {
m := &Main{Main: main.NewMain()}
m.Main.Stdin = &m.Stdin
m.Main.Stdout = &m.Stdout
m.Main.Stderr = &m.Stderr
// NewLauncher returns a new instance of Launcher.
func NewLauncher() *Launcher {
l := &Launcher{Launcher: launcher.NewLauncher()}
l.Launcher.Stdin = &l.Stdin
l.Launcher.Stdout = &l.Stdout
l.Launcher.Stderr = &l.Stderr
if testing.Verbose() {
m.Main.Stdout = io.MultiWriter(m.Main.Stdout, os.Stdout)
m.Main.Stderr = io.MultiWriter(m.Main.Stderr, os.Stderr)
l.Launcher.Stdout = io.MultiWriter(l.Launcher.Stdout, os.Stdout)
l.Launcher.Stderr = io.MultiWriter(l.Launcher.Stderr, os.Stderr)
}
path, err := ioutil.TempDir("", "")
if err != nil {
panic(err)
}
m.Path = path
return m
l.Path = path
return l
}
// RunMainOrFail initializes and starts the server.
func RunMainOrFail(tb testing.TB, ctx context.Context, args ...string) *Main {
// RunLauncherOrFail initializes and starts the server.
func RunLauncherOrFail(tb testing.TB, ctx context.Context, args ...string) *Launcher {
tb.Helper()
m := NewMain()
if err := m.Run(ctx, args...); err != nil {
l := NewLauncher()
if err := l.Run(ctx, args...); err != nil {
tb.Fatal(err)
}
return m
return l
}
// Run executes the program with additional arguments to set paths and ports.
func (m *Main) Run(ctx context.Context, args ...string) error {
args = append(args, "--bolt-path", filepath.Join(m.Path, "influxd.bolt"))
args = append(args, "--engine-path", filepath.Join(m.Path, "engine"))
args = append(args, "--nats-path", filepath.Join(m.Path, "nats"))
func (l *Launcher) Run(ctx context.Context, args ...string) error {
args = append(args, "--bolt-path", filepath.Join(l.Path, "influxd.bolt"))
args = append(args, "--engine-path", filepath.Join(l.Path, "engine"))
args = append(args, "--nats-path", filepath.Join(l.Path, "nats"))
args = append(args, "--http-bind-address", "127.0.0.1:0")
args = append(args, "--log-level", "debug")
return m.Main.Run(ctx, args...)
return l.Launcher.Run(ctx, args...)
}
// Shutdown stops the program and cleans up temporary paths.
func (m *Main) Shutdown(ctx context.Context) error {
m.Cancel()
m.Main.Shutdown(ctx)
return os.RemoveAll(m.Path)
func (l *Launcher) Shutdown(ctx context.Context) error {
l.Cancel()
l.Launcher.Shutdown(ctx)
return os.RemoveAll(l.Path)
}
// ShutdownOrFail stops the program and cleans up temporary paths. Fail on error.
func (m *Main) ShutdownOrFail(tb testing.TB, ctx context.Context) {
func (l *Launcher) ShutdownOrFail(tb testing.TB, ctx context.Context) {
tb.Helper()
if err := m.Shutdown(ctx); err != nil {
if err := l.Shutdown(ctx); err != nil {
tb.Fatal(err)
}
}
// SetupOrFail creates a new user, bucket, org, and auth token. Fail on error.
func (m *Main) SetupOrFail(tb testing.TB) {
svc := &http.SetupService{Addr: m.URL()}
func (l *Launcher) SetupOrFail(tb testing.TB) {
svc := &http.SetupService{Addr: l.URL()}
results, err := svc.Generate(ctx, &platform.OnboardingRequest{
User: "USER",
Password: "PASSWORD",
@ -173,23 +174,23 @@ func (m *Main) SetupOrFail(tb testing.TB) {
tb.Fatal(err)
}
m.User = results.User
m.Org = results.Org
m.Bucket = results.Bucket
m.Auth = results.Auth
l.User = results.User
l.Org = results.Org
l.Bucket = results.Bucket
l.Auth = results.Auth
}
func (m *Main) FluxService() *http.FluxService {
return &http.FluxService{Addr: m.URL(), Token: m.Auth.Token}
func (l *Launcher) FluxService() *http.FluxService {
return &http.FluxService{Addr: l.URL(), Token: l.Auth.Token}
}
// MustNewHTTPRequest returns a new nethttp.Request with base URL and auth attached. Fail on error.
func (m *Main) MustNewHTTPRequest(method, rawurl, body string) *nethttp.Request {
req, err := nethttp.NewRequest(method, m.URL()+rawurl, strings.NewReader(body))
func (l *Launcher) MustNewHTTPRequest(method, rawurl, body string) *nethttp.Request {
req, err := nethttp.NewRequest(method, l.URL()+rawurl, strings.NewReader(body))
if err != nil {
panic(err)
}
req.Header.Set("Authorization", "Token "+m.Auth.Token)
req.Header.Set("Authorization", "Token "+l.Auth.Token)
return req
}

View File

@ -3,48 +3,16 @@ package main
import (
"context"
"fmt"
"io"
"net"
nethttp "net/http"
_ "net/http/pprof"
"os"
"path/filepath"
"sync"
"time"
"github.com/influxdata/flux/control"
"github.com/influxdata/flux/execute"
"github.com/influxdata/platform"
"github.com/influxdata/platform/bolt"
"github.com/influxdata/platform/chronograf/server"
"github.com/influxdata/platform/gather"
"github.com/influxdata/platform/http"
"github.com/influxdata/platform/internal/fs"
"github.com/influxdata/platform/kit/cli"
"github.com/influxdata/platform/kit/prom"
"github.com/influxdata/platform/cmd/influxd/launcher"
"github.com/influxdata/platform/kit/signals"
influxlogger "github.com/influxdata/platform/logger"
"github.com/influxdata/platform/nats"
"github.com/influxdata/platform/query"
_ "github.com/influxdata/platform/query/builtin"
pcontrol "github.com/influxdata/platform/query/control"
"github.com/influxdata/platform/snowflake"
"github.com/influxdata/platform/source"
"github.com/influxdata/platform/storage"
"github.com/influxdata/platform/storage/readservice"
"github.com/influxdata/platform/task"
taskbackend "github.com/influxdata/platform/task/backend"
taskbolt "github.com/influxdata/platform/task/backend/bolt"
"github.com/influxdata/platform/task/backend/coordinator"
taskexecutor "github.com/influxdata/platform/task/backend/executor"
_ "github.com/influxdata/platform/tsdb/tsi1"
_ "github.com/influxdata/platform/tsdb/tsm1"
"github.com/influxdata/platform/vault"
pzap "github.com/influxdata/platform/zap"
opentracing "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
func main() {
@ -52,11 +20,11 @@ func main() {
ctx := context.Background()
ctx = signals.WithStandardSignals(ctx)
m := NewMain()
m := launcher.NewLauncher()
if err := m.Run(ctx, os.Args[1:]...); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
} else if !m.running {
} else if !m.Running() {
os.Exit(1)
}
@ -67,399 +35,3 @@ func main() {
defer cancel()
m.Shutdown(ctx)
}
// Main represents the main program execution.
type Main struct {
wg sync.WaitGroup
cancel func()
running bool
logLevel string
httpBindAddress string
boltPath string
natsPath string
developerMode bool
enginePath string
secretStore string
boltClient *bolt.Client
engine *storage.Engine
queryController *pcontrol.Controller
httpPort int
httpServer *nethttp.Server
natsServer *nats.Server
scheduler *taskbackend.TickScheduler
logger *zap.Logger
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
}
// NewMain returns a new instance of Main connected to standard in/out/err.
func NewMain() *Main {
return &Main{
Stdin: os.Stdin,
Stdout: os.Stdout,
Stderr: os.Stderr,
}
}
// URL returns the URL to connect to the HTTP server.
func (m *Main) URL() string {
return fmt.Sprintf("http://localhost:%d", m.httpPort)
}
// Shutdown shuts down the HTTP server and waits for all services to clean up.
func (m *Main) Shutdown(ctx context.Context) {
m.httpServer.Shutdown(ctx)
m.logger.Info("Stopping", zap.String("service", "task"))
m.scheduler.Stop()
m.logger.Info("Stopping", zap.String("service", "nats"))
m.natsServer.Close()
m.logger.Info("Stopping", zap.String("service", "bolt"))
if err := m.boltClient.Close(); err != nil {
m.logger.Info("failed closing bolt", zap.Error(err))
}
m.logger.Info("Stopping", zap.String("service", "query"))
if err := m.queryController.Shutdown(ctx); err != nil {
m.logger.Info("Failed closing query service", zap.Error(err))
}
m.logger.Info("Stopping", zap.String("service", "storage-engine"))
if err := m.engine.Close(); err != nil {
m.logger.Error("failed to close engine", zap.Error(err))
}
m.wg.Wait()
m.logger.Sync()
}
// Cancel executes the context cancel on the program. Used for testing.
func (m *Main) Cancel() { m.cancel() }
// Run executes the program with the given CLI arguments.
func (m *Main) Run(ctx context.Context, args ...string) error {
dir, err := fs.InfluxDir()
if err != nil {
return fmt.Errorf("failed to determine influx directory: %v", err)
}
prog := &cli.Program{
Name: "influxd",
Run: func() error { return m.run(ctx) },
Opts: []cli.Opt{
{
DestP: &m.logLevel,
Flag: "log-level",
Default: "info",
Desc: "supported log levels are debug, info, and error",
},
{
DestP: &m.httpBindAddress,
Flag: "http-bind-address",
Default: ":9999",
Desc: "bind address for the REST HTTP API",
},
{
DestP: &m.boltPath,
Flag: "bolt-path",
Default: filepath.Join(dir, "influxd.bolt"),
Desc: "path to boltdb database",
},
{
DestP: &m.developerMode,
Flag: "developer-mode",
Default: false,
Desc: "serve assets from the local filesystem in developer mode",
},
{
DestP: &m.natsPath,
Flag: "nats-path",
Default: filepath.Join(dir, "nats"),
Desc: "path to NATS queue for scraping tasks",
},
{
DestP: &m.enginePath,
Flag: "engine-path",
Default: filepath.Join(dir, "engine"),
Desc: "path to persistent engine files",
},
{
DestP: &m.secretStore,
Flag: "secret-store",
Default: "bolt",
Desc: "data store for secrets (bolt or vault)",
},
},
}
cmd := cli.NewCommand(prog)
cmd.SetArgs(args)
return cmd.Execute()
}
func (m *Main) run(ctx context.Context) (err error) {
m.running = true
ctx, m.cancel = context.WithCancel(ctx)
var lvl zapcore.Level
if err := lvl.Set(m.logLevel); err != nil {
return fmt.Errorf("unknown log level; supported levels are debug, info, and error")
}
// Create top level logger
logconf := &influxlogger.Config{
Format: "auto",
Level: lvl,
}
m.logger, err = logconf.New(m.Stdout)
if err != nil {
return err
}
// set tracing
tracer := new(pzap.Tracer)
tracer.Logger = m.logger
tracer.IDGenerator = snowflake.NewIDGenerator()
opentracing.SetGlobalTracer(tracer)
reg := prom.NewRegistry()
reg.MustRegister(prometheus.NewGoCollector())
reg.WithLogger(m.logger)
m.boltClient = bolt.NewClient()
m.boltClient.Path = m.boltPath
m.boltClient.WithLogger(m.logger.With(zap.String("service", "bolt")))
if err := m.boltClient.Open(ctx); err != nil {
m.logger.Error("failed opening bolt", zap.Error(err))
return err
}
var (
orgSvc platform.OrganizationService = m.boltClient
authSvc platform.AuthorizationService = m.boltClient
userSvc platform.UserService = m.boltClient
viewSvc platform.ViewService = m.boltClient
macroSvc platform.MacroService = m.boltClient
bucketSvc platform.BucketService = m.boltClient
sourceSvc platform.SourceService = m.boltClient
sessionSvc platform.SessionService = m.boltClient
basicAuthSvc platform.BasicAuthService = m.boltClient
dashboardSvc platform.DashboardService = m.boltClient
dashboardLogSvc platform.DashboardOperationLogService = m.boltClient
userLogSvc platform.UserOperationLogService = m.boltClient
bucketLogSvc platform.BucketOperationLogService = m.boltClient
orgLogSvc platform.OrganizationOperationLogService = m.boltClient
onboardingSvc platform.OnboardingService = m.boltClient
scraperTargetSvc platform.ScraperTargetStoreService = m.boltClient
telegrafSvc platform.TelegrafConfigStore = m.boltClient
userResourceSvc platform.UserResourceMappingService = m.boltClient
labelSvc platform.LabelService = m.boltClient
secretSvc platform.SecretService = m.boltClient
lookupSvc platform.LookupService = m.boltClient
)
switch m.secretStore {
case "bolt":
// If it is bolt, then we already set it above.
case "vault":
// The vault secret service is configured using the standard vault environment variables.
// https://www.vaultproject.io/docs/commands/index.html#environment-variables
svc, err := vault.NewSecretService()
if err != nil {
m.logger.Error("failed initalizing vault secret service", zap.Error(err))
return err
}
secretSvc = svc
default:
err := fmt.Errorf("unknown secret service %q, expected \"bolt\" or \"vault\"", m.secretStore)
m.logger.Error("failed setting secret service", zap.Error(err))
return err
}
chronografSvc, err := server.NewServiceV2(ctx, m.boltClient.DB())
if err != nil {
m.logger.Error("failed creating chronograf service", zap.Error(err))
return err
}
var pointsWriter storage.PointsWriter
{
m.engine = storage.NewEngine(m.enginePath, storage.NewConfig(), storage.WithRetentionEnforcer(bucketSvc))
m.engine.WithLogger(m.logger)
if err := m.engine.Open(); err != nil {
m.logger.Error("failed to open engine", zap.Error(err))
return err
}
// The Engine's metrics must be registered after it opens.
reg.MustRegister(m.engine.PrometheusCollectors()...)
pointsWriter = m.engine
const (
concurrencyQuota = 10
memoryBytesQuota = 1e6
)
cc := control.Config{
ExecutorDependencies: make(execute.Dependencies),
ConcurrencyQuota: concurrencyQuota,
MemoryBytesQuota: int64(memoryBytesQuota),
Logger: m.logger.With(zap.String("service", "storage-reads")),
}
if err := readservice.AddControllerConfigDependencies(
&cc, m.engine, bucketSvc, orgSvc,
); err != nil {
m.logger.Error("Failed to configure query controller dependencies", zap.Error(err))
return err
}
m.queryController = pcontrol.New(cc)
reg.MustRegister(m.queryController.PrometheusCollectors()...)
}
var storageQueryService query.ProxyQueryService = readservice.NewProxyQueryService(m.queryController)
var taskSvc platform.TaskService
{
boltStore, err := taskbolt.New(m.boltClient.DB(), "tasks")
if err != nil {
m.logger.Error("failed opening task bolt", zap.Error(err))
return err
}
executor := taskexecutor.NewAsyncQueryServiceExecutor(m.logger.With(zap.String("service", "task-executor")), m.queryController, boltStore)
lw := taskbackend.NewPointLogWriter(pointsWriter)
m.scheduler = taskbackend.NewScheduler(boltStore, executor, lw, time.Now().UTC().Unix(), taskbackend.WithTicker(ctx, 100*time.Millisecond), taskbackend.WithLogger(m.logger))
m.scheduler.Start(ctx)
reg.MustRegister(m.scheduler.PrometheusCollectors()...)
queryService := query.QueryServiceBridge{AsyncQueryService: m.queryController}
lr := taskbackend.NewQueryLogReader(queryService)
taskSvc = task.PlatformAdapter(coordinator.New(m.logger.With(zap.String("service", "task-coordinator")), m.scheduler, boltStore), lr, m.scheduler)
taskSvc = task.NewValidator(taskSvc, bucketSvc)
}
// NATS streaming server
m.natsServer = nats.NewServer(nats.Config{FilestoreDir: m.natsPath})
if err := m.natsServer.Open(); err != nil {
m.logger.Error("failed to start nats streaming server", zap.Error(err))
return err
}
publisher := nats.NewAsyncPublisher("nats-publisher")
if err := publisher.Open(); err != nil {
m.logger.Error("failed to connect to streaming server", zap.Error(err))
return err
}
// TODO(jm): this is an example of using a subscriber to consume from the channel. It should be removed.
subscriber := nats.NewQueueSubscriber("nats-subscriber")
if err := subscriber.Open(); err != nil {
m.logger.Error("failed to connect to streaming server", zap.Error(err))
return err
}
scraperScheduler, err := gather.NewScheduler(10, m.logger, scraperTargetSvc, publisher, subscriber, 0, 0)
if err != nil {
m.logger.Error("failed to create scraper subscriber", zap.Error(err))
return err
}
m.wg.Add(1)
go func(logger *zap.Logger) {
defer m.wg.Done()
logger = logger.With(zap.String("service", "scraper"))
if err := scraperScheduler.Run(ctx); err != nil {
logger.Error("failed scraper service", zap.Error(err))
}
logger.Info("Stopping")
}(m.logger)
m.httpServer = &nethttp.Server{
Addr: m.httpBindAddress,
}
handlerConfig := &http.APIBackend{
DeveloperMode: m.developerMode,
Logger: m.logger,
NewBucketService: source.NewBucketService,
NewQueryService: source.NewQueryService,
PointsWriter: pointsWriter,
AuthorizationService: authSvc,
BucketService: bucketSvc,
SessionService: sessionSvc,
UserService: userSvc,
OrganizationService: orgSvc,
UserResourceMappingService: userResourceSvc,
LabelService: labelSvc,
DashboardService: dashboardSvc,
DashboardOperationLogService: dashboardLogSvc,
BucketOperationLogService: bucketLogSvc,
UserOperationLogService: userLogSvc,
OrganizationOperationLogService: orgLogSvc,
ViewService: viewSvc,
SourceService: sourceSvc,
MacroService: macroSvc,
BasicAuthService: basicAuthSvc,
OnboardingService: onboardingSvc,
ProxyQueryService: storageQueryService,
TaskService: taskSvc,
TelegrafService: telegrafSvc,
ScraperTargetStoreService: scraperTargetSvc,
ChronografService: chronografSvc,
SecretService: secretSvc,
LookupService: lookupSvc,
}
// HTTP server
httpLogger := m.logger.With(zap.String("service", "http"))
platformHandler := http.NewPlatformHandler(handlerConfig)
reg.MustRegister(platformHandler.PrometheusCollectors()...)
h := http.NewHandlerFromRegistry("platform", reg)
h.Handler = platformHandler
h.Logger = httpLogger
h.Tracer = opentracing.GlobalTracer()
m.httpServer.Handler = h
ln, err := net.Listen("tcp", m.httpBindAddress)
if err != nil {
httpLogger.Error("failed http listener", zap.Error(err))
httpLogger.Info("Stopping")
return err
}
if addr, ok := ln.Addr().(*net.TCPAddr); ok {
m.httpPort = addr.Port
}
m.wg.Add(1)
go func(logger *zap.Logger) {
defer m.wg.Done()
logger.Info("Listening", zap.String("transport", "http"), zap.String("addr", m.httpBindAddress), zap.Int("port", m.httpPort))
if err := m.httpServer.Serve(ln); err != nethttp.ErrServerClosed {
logger.Error("failed http service", zap.Error(err))
}
logger.Info("Stopping")
}(httpLogger)
return nil
}

4
go.mod
View File

@ -79,7 +79,7 @@ require (
github.com/hashicorp/vault-plugin-secrets-kv v0.0.0-20181106190520-2236f141171e // indirect
github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d // indirect
github.com/imdario/mergo v0.3.6 // indirect
github.com/influxdata/flux v0.11.0
github.com/influxdata/flux v0.12.0
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6
github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368
github.com/jefferai/jsonx v0.0.0-20160721235117-9cc31c3135ee // indirect
@ -146,7 +146,7 @@ require (
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2
golang.org/x/tools v0.0.0-20181121193951-91f80e683c10
golang.org/x/tools v0.0.0-20181221154417-3ad2d988d5e2
google.golang.org/api v0.0.0-20181021000519-a2651947f503
google.golang.org/appengine v1.2.0 // indirect
google.golang.org/genproto v0.0.0-20181016170114-94acd270e44e // indirect

8
go.sum
View File

@ -240,8 +240,8 @@ github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28=
github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/flux v0.11.0 h1:HbnN00buPrE8xoiUY7zt9E0BCsdXrbp5w8BNOmjODk0=
github.com/influxdata/flux v0.11.0/go.mod h1:crguqnTMQHaGEKp93vZH+pIyTVlJYqkv8bNqSMfc22A=
github.com/influxdata/flux v0.12.0 h1:mI91GHgqb5sbsz9fesvKwbZrh3wpxLKIEeCYswRBh2A=
github.com/influxdata/flux v0.12.0/go.mod h1:81jeDcHVn1rN5uj9aQ81S72Q8ol8If7N0zM0G8TnxTE=
github.com/influxdata/goreleaser v0.86.2-0.20181010170531-0fd209ba67f5/go.mod h1:aVuBpDAT5VtjtUxzvBt8HOd0buzvvk7OX3H2iaviixg=
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6 h1:CFx+pP90q/qg3spoiZjf8donE4WpAdjeJfPOcoNqkWo=
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6/go.mod h1:KpVI7okXjK6PRi3Z5B+mtKZli+R1DnZgb3N+tzevNgo=
@ -475,8 +475,8 @@ golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 h1:+DCIGbF/swA92ohVg0//6X2I
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181121193951-91f80e683c10 h1:6aZMfwu0xab6imbp0uu++D3WXR+p0+RDYOqqb0uY8KU=
golang.org/x/tools v0.0.0-20181121193951-91f80e683c10/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181221154417-3ad2d988d5e2 h1:M7NLB69gFpUH4s6SJLwXiVs45aZfVjqGKynfNFKSGcI=
golang.org/x/tools v0.0.0-20181221154417-3ad2d988d5e2/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca h1:PupagGYwj8+I4ubCxcmcBRk3VlUWtTg5huQpZR9flmE=
gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6 h1:4WsZyVtkthqrHTbDCJfiTs8IWNYE4uvsSDgaV6xpp+o=

View File

@ -490,7 +490,7 @@ func writeTable(t *ToTransformation, tbl flux.Table) error {
measurementStats := make(map[string]Stats)
measurementName := ""
return tbl.Do(func(er flux.ColReader) error {
return tbl.DoArrow(func(er flux.ArrowColReader) error {
var pointTime time.Time
var points models.Points
var tags models.Tags
@ -502,16 +502,16 @@ func writeTable(t *ToTransformation, tbl flux.Table) error {
for j, col := range er.Cols() {
switch {
case col.Label == spec.MeasurementColumn:
measurementName = er.Strings(j)[i]
measurementName = string(er.Strings(j).Value(i))
case col.Label == timeColLabel:
pointTime = er.Times(j)[i].Time()
pointTime = execute.ValueForRowArrow(er, i, j).Time().Time()
case isTag[j]:
if col.Type != flux.TString {
return errors.New("invalid type for tag column")
}
// TODO(docmerlin): instead of doing this sort of thing, it would be nice if we had a way that allocated a lot less.
// Note that tags are 2-tuples of key and then value.
tags = append(tags, models.NewTag([]byte(col.Label), []byte(er.Strings(j)[i])))
tags = append(tags, models.NewTag([]byte(col.Label), er.Strings(j).Value(i)))
}
}
@ -567,7 +567,7 @@ func writeTable(t *ToTransformation, tbl flux.Table) error {
return err
}
points = append(points, pt)
if err := execute.AppendRecord(i, er, builder); err != nil {
if err := execute.AppendRecordArrow(i, er, builder); err != nil {
return err
}
}
@ -576,7 +576,7 @@ func writeTable(t *ToTransformation, tbl flux.Table) error {
})
}
func defaultFieldMapping(er flux.ColReader, row int) (values.Object, error) {
func defaultFieldMapping(er flux.ArrowColReader, row int) (values.Object, error) {
fieldColumnIdx := execute.ColIdx(defaultFieldColLabel, er.Cols())
valueColumnIdx := execute.ColIdx(execute.DefaultValueColLabel, er.Cols())
@ -588,29 +588,11 @@ func defaultFieldMapping(er flux.ColReader, row int) (values.Object, error) {
return nil, errors.New("table has no _value column")
}
var value values.Value
valueColumnType := er.Cols()[valueColumnIdx].Type
switch valueColumnType {
case flux.TFloat:
value = values.NewFloat(er.Floats(valueColumnIdx)[row])
case flux.TInt:
value = values.NewInt(er.Ints(valueColumnIdx)[row])
case flux.TUInt:
value = values.NewUInt(er.UInts(valueColumnIdx)[row])
case flux.TString:
value = values.NewString(er.Strings(valueColumnIdx)[row])
case flux.TTime:
value = values.NewTime(er.Times(valueColumnIdx)[row])
case flux.TBool:
value = values.NewBool(er.Bools(valueColumnIdx)[row])
default:
return nil, fmt.Errorf("unsupported type %v for _value column", valueColumnType)
}
value := execute.ValueForRowArrow(er, row, valueColumnIdx)
fieldValueMapping := values.NewObject()
field := er.Strings(fieldColumnIdx)[row]
fieldValueMapping.Set(field, value)
field := execute.ValueForRowArrow(er, row, fieldColumnIdx)
fieldValueMapping.Set(field.Str(), value)
return fieldValueMapping, nil
}

View File

@ -1,4 +1,4 @@
package functions_test
package influxql_test
import (
"bufio"
@ -8,15 +8,12 @@ import (
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"
"github.com/influxdata/flux"
"github.com/influxdata/flux/csv"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/execute/executetest"
ifql "github.com/influxdata/flux/influxql"
"github.com/influxdata/flux/lang"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/querytest"
"github.com/influxdata/platform"
@ -25,13 +22,11 @@ import (
_ "github.com/influxdata/platform/query/builtin"
"github.com/influxdata/platform/query/influxql"
platformtesting "github.com/influxdata/platform/testing"
"github.com/andreyvit/diff"
)
const generatedInfluxQLDataDir = "testdata"
var dbrpMappingSvc = mock.NewDBRPMappingService()
var dbrpMappingSvcE2E = mock.NewDBRPMappingService()
func init() {
mapping := platform.DBRPMapping{
@ -42,13 +37,13 @@ func init() {
OrganizationID: platformtesting.MustIDBase16("cadecadecadecade"),
BucketID: platformtesting.MustIDBase16("da7aba5e5eedca5e"),
}
dbrpMappingSvc.FindByFn = func(ctx context.Context, cluster string, db string, rp string) (*platform.DBRPMapping, error) {
dbrpMappingSvcE2E.FindByFn = func(ctx context.Context, cluster string, db string, rp string) (*platform.DBRPMapping, error) {
return &mapping, nil
}
dbrpMappingSvc.FindFn = func(ctx context.Context, filter platform.DBRPMappingFilter) (*platform.DBRPMapping, error) {
dbrpMappingSvcE2E.FindFn = func(ctx context.Context, filter platform.DBRPMappingFilter) (*platform.DBRPMapping, error) {
return &mapping, nil
}
dbrpMappingSvc.FindManyFn = func(ctx context.Context, filter platform.DBRPMappingFilter, opt ...platform.FindOptions) ([]*platform.DBRPMapping, int, error) {
dbrpMappingSvcE2E.FindManyFn = func(ctx context.Context, filter platform.DBRPMappingFilter, opt ...platform.FindOptions) ([]*platform.DBRPMapping, int, error) {
return []*platform.DBRPMapping{&mapping}, 1, nil
}
}
@ -151,26 +146,6 @@ var skipTests = map[string]string{
var querier = querytest.NewQuerier()
func withEachFluxFile(t testing.TB, fn func(prefix, caseName string)) {
dir, err := os.Getwd()
if err != nil {
t.Fatal(err)
}
path := filepath.Join(dir, "testdata")
fluxFiles, err := filepath.Glob(filepath.Join(path, "*.flux"))
if err != nil {
t.Fatalf("error searching for Flux files: %s", err)
}
for _, fluxFile := range fluxFiles {
ext := filepath.Ext(fluxFile)
prefix := fluxFile[0 : len(fluxFile)-len(ext)]
_, caseName := filepath.Split(prefix)
fn(prefix, caseName)
}
}
func withEachInfluxQLFile(t testing.TB, fn func(prefix, caseName string)) {
dir, err := os.Getwd()
if err != nil {
@ -191,27 +166,6 @@ func withEachInfluxQLFile(t testing.TB, fn func(prefix, caseName string)) {
}
}
func Test_QueryEndToEnd(t *testing.T) {
withEachFluxFile(t, func(prefix, caseName string) {
reason, skip := skipTests[caseName]
fluxName := caseName + ".flux"
influxqlName := caseName + ".influxql"
t.Run(fluxName, func(t *testing.T) {
if skip {
t.Skip(reason)
}
testFlux(t, querier, prefix, ".flux")
})
t.Run(influxqlName, func(t *testing.T) {
if skip {
t.Skip(reason)
}
testInfluxQL(t, querier, prefix, ".influxql")
})
})
}
func Test_GeneratedInfluxQLQueries(t *testing.T) {
withEachInfluxQLFile(t, func(prefix, caseName string) {
reason, skip := skipTests[caseName]
@ -225,109 +179,6 @@ func Test_GeneratedInfluxQLQueries(t *testing.T) {
})
}
func Benchmark_QueryEndToEnd(b *testing.B) {
withEachFluxFile(b, func(prefix, caseName string) {
reason, skip := skipTests[caseName]
if skip {
b.Skip(reason)
}
fluxName := caseName + ".flux"
influxqlName := caseName + ".influxql"
b.Run(fluxName, func(b *testing.B) {
if skip {
b.Skip(reason)
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
testFlux(b, querier, prefix, ".flux")
}
})
b.Run(influxqlName, func(b *testing.B) {
if skip {
b.Skip(reason)
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
testInfluxQL(b, querier, prefix, ".influxql")
}
})
})
}
func testFlux(t testing.TB, querier *querytest.Querier, prefix, queryExt string) {
q, err := ioutil.ReadFile(prefix + queryExt)
if err != nil {
t.Fatal(err)
}
csvInFilename := prefix + ".in.csv"
csvOut, err := ioutil.ReadFile(prefix + ".out.csv")
if err != nil {
t.Fatal(err)
}
compiler := lang.FluxCompiler{
Query: string(q),
}
req := &query.ProxyRequest{
Request: query.Request{
Compiler: querytest.FromCSVCompiler{
Compiler: compiler,
InputFile: csvInFilename,
},
},
Dialect: csv.DefaultDialect(),
}
QueryTestCheckSpec(t, querier, req, string(csvOut))
}
func testInfluxQL(t testing.TB, querier *querytest.Querier, prefix, queryExt string) {
q, err := ioutil.ReadFile(prefix + queryExt)
if err != nil {
if !os.IsNotExist(err) {
t.Fatal(err)
}
t.Skip("influxql query is missing")
}
csvInFilename := prefix + ".in.csv"
csvOut, err := ioutil.ReadFile(prefix + ".out.csv")
if err != nil {
t.Fatal(err)
}
compiler := influxql.NewCompiler(dbrpMappingSvc)
compiler.Cluster = "cluster"
compiler.DB = "db0"
compiler.Query = string(q)
req := &query.ProxyRequest{
Request: query.Request{
Compiler: querytest.FromCSVCompiler{
Compiler: compiler,
InputFile: csvInFilename,
},
},
Dialect: csv.DefaultDialect(),
}
QueryTestCheckSpec(t, querier, req, string(csvOut))
// Rerun test for InfluxQL JSON dialect
req.Dialect = new(influxql.Dialect)
jsonOut, err := ioutil.ReadFile(prefix + ".out.json")
if err != nil {
if !os.IsNotExist(err) {
t.Fatal(err)
}
t.Skip("influxql expected json is missing")
}
QueryTestCheckSpec(t, querier, req, string(jsonOut))
}
func testGeneratedInfluxQL(t testing.TB, prefix, queryExt string) {
q, err := ioutil.ReadFile(prefix + queryExt)
if err != nil {
@ -400,7 +251,7 @@ func resultsFromQuerier(querier *querytest.Querier, compiler flux.Compiler) (flu
}
func influxQLCompiler(query, filename string) querytest.FromInfluxJSONCompiler {
compiler := influxql.NewCompiler(dbrpMappingSvc)
compiler := influxql.NewCompiler(dbrpMappingSvcE2E)
compiler.Cluster = "cluster"
compiler.DB = "db0"
compiler.Query = query
@ -438,20 +289,3 @@ func jsonToResultIterator(file string) (flux.ResultIterator, error) {
}
return results, nil
}
func QueryTestCheckSpec(t testing.TB, querier *querytest.Querier, req *query.ProxyRequest, want string) {
t.Helper()
var buf bytes.Buffer
_, err := querier.Query(context.Background(), &buf, req.Request.Compiler, req.Dialect)
if err != nil {
t.Errorf("failed to run query: %v", err)
return
}
got := buf.String()
if g, w := strings.TrimSpace(got), strings.TrimSpace(want); g != w {
t.Errorf("result not as expected want(-) got (+):\n%v", diff.LineDiff(w, g))
}
}

Some files were not shown because too many files have changed in this diff Show More