229 lines
6.9 KiB
Go
229 lines
6.9 KiB
Go
package backend_test
|
|
|
|
import (
|
|
"context"
|
|
"os"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/golang/mock/gomock"
|
|
"github.com/influxdata/flux"
|
|
"github.com/influxdata/flux/dependencies/url"
|
|
"github.com/influxdata/influxdb/v2"
|
|
"github.com/influxdata/influxdb/v2/authorization"
|
|
icontext "github.com/influxdata/influxdb/v2/context"
|
|
_ "github.com/influxdata/influxdb/v2/fluxinit/static"
|
|
"github.com/influxdata/influxdb/v2/inmem"
|
|
"github.com/influxdata/influxdb/v2/kit/platform"
|
|
"github.com/influxdata/influxdb/v2/kv"
|
|
"github.com/influxdata/influxdb/v2/kv/migration/all"
|
|
"github.com/influxdata/influxdb/v2/mock"
|
|
"github.com/influxdata/influxdb/v2/query"
|
|
"github.com/influxdata/influxdb/v2/query/control"
|
|
"github.com/influxdata/influxdb/v2/query/fluxlang"
|
|
stdlib "github.com/influxdata/influxdb/v2/query/stdlib/influxdata/influxdb"
|
|
"github.com/influxdata/influxdb/v2/storage"
|
|
storageflux "github.com/influxdata/influxdb/v2/storage/flux"
|
|
"github.com/influxdata/influxdb/v2/task/backend"
|
|
"github.com/influxdata/influxdb/v2/task/servicetest"
|
|
"github.com/influxdata/influxdb/v2/task/taskmodel"
|
|
"github.com/influxdata/influxdb/v2/tenant"
|
|
"github.com/influxdata/influxdb/v2/v1/services/meta"
|
|
storage2 "github.com/influxdata/influxdb/v2/v1/services/storage"
|
|
"github.com/stretchr/testify/require"
|
|
"go.uber.org/zap"
|
|
"go.uber.org/zap/zaptest"
|
|
)
|
|
|
|
func TestAnalyticalStore(t *testing.T) {
|
|
t.Skip("https://github.com/influxdata/influxdb/issues/22920")
|
|
servicetest.TestTaskService(
|
|
t,
|
|
func(t *testing.T) (*servicetest.System, context.CancelFunc) {
|
|
ctx, cancelFunc := context.WithCancel(context.Background())
|
|
logger := zaptest.NewLogger(t)
|
|
store := inmem.NewKVStore()
|
|
if err := all.Up(ctx, logger, store); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
tenantStore := tenant.NewStore(store)
|
|
ts := tenant.NewService(tenantStore)
|
|
|
|
authStore, err := authorization.NewStore(store)
|
|
require.NoError(t, err)
|
|
authSvc := authorization.NewService(authStore, ts)
|
|
|
|
svc := kv.NewService(logger, store, ts, kv.ServiceConfig{
|
|
FluxLanguageService: fluxlang.DefaultService,
|
|
})
|
|
|
|
metaClient := meta.NewClient(meta.NewConfig(), store)
|
|
require.NoError(t, metaClient.Open())
|
|
|
|
var (
|
|
ab = newAnalyticalBackend(t, ts.OrganizationService, ts.BucketService, metaClient)
|
|
svcStack = backend.NewAnalyticalStorage(logger, svc, ts.BucketService, svc, ab.PointsWriter(), ab.QueryService())
|
|
)
|
|
|
|
ts.BucketService = storage.NewBucketService(logger, ts.BucketService, ab.storageEngine)
|
|
|
|
authCtx := icontext.SetAuthorizer(ctx, &influxdb.Authorization{
|
|
Permissions: influxdb.OperPermissions(),
|
|
})
|
|
|
|
return &servicetest.System{
|
|
TaskControlService: svcStack,
|
|
TaskService: svcStack,
|
|
OrganizationService: ts.OrganizationService,
|
|
UserService: ts.UserService,
|
|
UserResourceMappingService: ts.UserResourceMappingService,
|
|
AuthorizationService: authSvc,
|
|
Ctx: authCtx,
|
|
CallFinishRun: true,
|
|
}, func() {
|
|
cancelFunc()
|
|
ab.Close(t)
|
|
}
|
|
},
|
|
)
|
|
}
|
|
|
|
func TestDeduplicateRuns(t *testing.T) {
|
|
ctrl := gomock.NewController(t)
|
|
defer ctrl.Finish()
|
|
|
|
logger := zaptest.NewLogger(t)
|
|
store := inmem.NewKVStore()
|
|
if err := all.Up(context.Background(), logger, store); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
tenantStore := tenant.NewStore(store)
|
|
ts := tenant.NewService(tenantStore)
|
|
|
|
metaClient := meta.NewClient(meta.NewConfig(), store)
|
|
require.NoError(t, metaClient.Open())
|
|
|
|
_, err := metaClient.CreateDatabase(platform.ID(10).String())
|
|
require.NoError(t, err)
|
|
|
|
ab := newAnalyticalBackend(t, ts.OrganizationService, ts.BucketService, metaClient)
|
|
defer ab.Close(t)
|
|
|
|
mockTS := &mock.TaskService{
|
|
FindTaskByIDFn: func(context.Context, platform.ID) (*taskmodel.Task, error) {
|
|
return &taskmodel.Task{ID: 1, OrganizationID: 20}, nil
|
|
},
|
|
FindRunsFn: func(context.Context, taskmodel.RunFilter) ([]*taskmodel.Run, int, error) {
|
|
return []*taskmodel.Run{
|
|
{ID: 2, Status: "started"},
|
|
}, 1, nil
|
|
},
|
|
}
|
|
mockTCS := &mock.TaskControlService{
|
|
FinishRunFn: func(ctx context.Context, taskID, runID platform.ID) (*taskmodel.Run, error) {
|
|
return &taskmodel.Run{ID: 2, TaskID: 1, Status: "success", ScheduledFor: time.Now(), StartedAt: time.Now().Add(1), FinishedAt: time.Now().Add(2)}, nil
|
|
},
|
|
}
|
|
mockBS := mock.NewBucketService()
|
|
|
|
svcStack := backend.NewAnalyticalStorage(zaptest.NewLogger(t), mockTS, mockBS, mockTCS, ab.PointsWriter(), ab.QueryService())
|
|
|
|
_, err = svcStack.FinishRun(context.Background(), 1, 2)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
runs, _, err := svcStack.FindRuns(context.Background(), taskmodel.RunFilter{Task: 1})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
if len(runs) != 1 {
|
|
t.Fatalf("expected 1 run but got %d", len(runs))
|
|
}
|
|
|
|
if runs[0].Status != "success" {
|
|
t.Fatalf("expected the deduped run to be 'success', got: %s", runs[0].Status)
|
|
}
|
|
}
|
|
|
|
type analyticalBackend struct {
|
|
queryController *control.Controller
|
|
rootDir string
|
|
storageEngine *storage.Engine
|
|
}
|
|
|
|
func (ab *analyticalBackend) PointsWriter() storage.PointsWriter {
|
|
return ab.storageEngine
|
|
}
|
|
|
|
func (ab *analyticalBackend) QueryService() query.QueryService {
|
|
return query.QueryServiceBridge{AsyncQueryService: ab.queryController}
|
|
}
|
|
|
|
func (ab *analyticalBackend) Close(t *testing.T) {
|
|
if err := ab.queryController.Shutdown(context.Background()); err != nil {
|
|
t.Error(err)
|
|
}
|
|
if err := ab.storageEngine.Close(); err != nil {
|
|
t.Error(err)
|
|
}
|
|
if err := os.RemoveAll(ab.rootDir); err != nil {
|
|
t.Error(err)
|
|
}
|
|
}
|
|
|
|
func newAnalyticalBackend(t *testing.T, orgSvc influxdb.OrganizationService, bucketSvc influxdb.BucketService, metaClient storage.MetaClient) *analyticalBackend {
|
|
// Mostly copied out of cmd/influxd/main.go.
|
|
logger := zaptest.NewLogger(t)
|
|
|
|
rootDir := t.TempDir()
|
|
|
|
engine := storage.NewEngine(rootDir, storage.NewConfig(), storage.WithMetaClient(metaClient))
|
|
engine.WithLogger(logger)
|
|
|
|
if err := engine.Open(context.Background()); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
defer func() {
|
|
if t.Failed() {
|
|
engine.Close()
|
|
}
|
|
}()
|
|
|
|
const (
|
|
concurrencyQuota = 10
|
|
memoryBytesQuotaPerQuery = 1e6
|
|
queueSize = 10
|
|
)
|
|
|
|
// TODO(adam): do we need a proper secret service here?
|
|
storageStore := storage2.NewStore(engine.TSDBStore(), engine.MetaClient())
|
|
readsReader := storageflux.NewReader(storageStore)
|
|
|
|
deps, err := stdlib.NewDependencies(readsReader, engine, bucketSvc, orgSvc, nil, nil, stdlib.WithURLValidator(url.PassValidator{}))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
cc := control.Config{
|
|
ExecutorDependencies: []flux.Dependency{deps},
|
|
ConcurrencyQuota: concurrencyQuota,
|
|
MemoryBytesQuotaPerQuery: int64(memoryBytesQuotaPerQuery),
|
|
QueueSize: queueSize,
|
|
}
|
|
|
|
queryController, err := control.New(cc, logger.With(zap.String("service", "storage-reads")))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
return &analyticalBackend{
|
|
queryController: queryController,
|
|
rootDir: rootDir,
|
|
storageEngine: engine,
|
|
}
|
|
}
|