make necessary changes

pull/10616/head
Lyon Hill 2018-08-07 16:51:33 -06:00
parent 02d1b6507b
commit 212210eaec
4 changed files with 27 additions and 53 deletions

View File

@ -20,6 +20,7 @@ import (
"github.com/influxdata/platform/http"
"github.com/influxdata/platform/kit/prom"
"github.com/influxdata/platform/query"
_ "github.com/influxdata/platform/query/builtin"
"github.com/influxdata/platform/query/control"
"github.com/influxdata/platform/query/execute"
"github.com/influxdata/platform/task"
@ -136,8 +137,7 @@ func platformF(cmd *cobra.Command, args []string) {
{
boltStore, err := taskbolt.New(c.DB(), "tasks")
if err != nil {
logger.Error("failed opening task bolt", zap.Error(err))
os.Exit(1)
logger.Fatal("failed opening task bolt", zap.Error(err))
}
executor := taskexecutor.NewQueryServiceExecutor(logger, queryService, boltStore)

View File

@ -2,7 +2,6 @@ package http
import (
"context"
"fmt"
nethttp "net/http"
"strings"
@ -92,7 +91,6 @@ func (h *PlatformHandler) ServeHTTP(w nethttp.ResponseWriter, r *nethttp.Request
}
if strings.HasPrefix(r.URL.Path, "/v1/tasks") {
fmt.Println("handled?")
h.TaskHandler.ServeHTTP(w, r)
}

View File

@ -11,8 +11,9 @@ import (
)
type Coordinator struct {
backend.Store
sch backend.Scheduler
st backend.Store
limit int
}
@ -28,7 +29,7 @@ func WithLimit(i int) Option {
func New(scheduler backend.Scheduler, st backend.Store, opts ...Option) backend.Store {
c := &Coordinator{
sch: scheduler,
st: st,
Store: st,
limit: 1000,
}
@ -40,23 +41,23 @@ func New(scheduler backend.Scheduler, st backend.Store, opts ...Option) backend.
}
func (c *Coordinator) CreateTask(ctx context.Context, org, user platform.ID, script string) (platform.ID, error) {
id, err := c.st.CreateTask(ctx, org, user, script)
if err != nil {
return id, err
}
task, err := c.st.FindTaskByID(ctx, id)
if err != nil {
return id, err
}
opt, err := options.FromScript(script)
if err != nil {
return nil, err
}
id, err := c.Store.CreateTask(ctx, org, user, script)
if err != nil {
return id, err
}
task, err := c.Store.FindTaskByID(ctx, id)
if err != nil {
return id, err
}
if err := c.sch.ClaimTask(task, time.Now().UTC().Unix(), &opt); err != nil {
_, delErr := c.st.DeleteTask(ctx, id)
_, delErr := c.Store.DeleteTask(ctx, id)
if delErr != nil {
return id, fmt.Errorf("schedule task failed: %s\n\tcleanup also failed: %s", err, delErr)
}
@ -72,16 +73,16 @@ func (c *Coordinator) ModifyTask(ctx context.Context, id platform.ID, newScript
return err
}
if err := c.st.ModifyTask(ctx, id, newScript); err != nil {
if err := c.Store.ModifyTask(ctx, id, newScript); err != nil {
return err
}
task, err := c.st.FindTaskByID(ctx, id)
task, err := c.Store.FindTaskByID(ctx, id)
if err != nil {
return err
}
meta, err := c.st.FindTaskMetaByID(ctx, id)
meta, err := c.Store.FindTaskMetaByID(ctx, id)
if err != nil {
return err
}
@ -97,20 +98,12 @@ func (c *Coordinator) ModifyTask(ctx context.Context, id platform.ID, newScript
return nil
}
func (c *Coordinator) ListTasks(ctx context.Context, params backend.TaskSearchParams) ([]backend.StoreTask, error) {
return c.st.ListTasks(ctx, params)
}
func (c *Coordinator) FindTaskByID(ctx context.Context, id platform.ID) (*backend.StoreTask, error) {
return c.st.FindTaskByID(ctx, id)
}
func (c *Coordinator) EnableTask(ctx context.Context, id platform.ID) error {
if err := c.st.EnableTask(ctx, id); err != nil {
if err := c.Store.EnableTask(ctx, id); err != nil {
return err
}
task, err := c.st.FindTaskByID(ctx, id)
task, err := c.Store.FindTaskByID(ctx, id)
if err != nil {
return err
}
@ -128,35 +121,23 @@ func (c *Coordinator) EnableTask(ctx context.Context, id platform.ID) error {
}
func (c *Coordinator) DisableTask(ctx context.Context, id platform.ID) error {
if err := c.st.DisableTask(ctx, id); err != nil {
if err := c.Store.DisableTask(ctx, id); err != nil {
return err
}
return c.sch.ReleaseTask(id)
}
func (c *Coordinator) FindTaskMetaByID(ctx context.Context, id platform.ID) (*backend.StoreTaskMeta, error) {
return c.st.FindTaskMetaByID(ctx, id)
}
func (c *Coordinator) DeleteTask(ctx context.Context, id platform.ID) (deleted bool, err error) {
if err := c.sch.ReleaseTask(id); err != nil {
return false, err
}
return c.st.DeleteTask(ctx, id)
}
func (c *Coordinator) CreateRun(ctx context.Context, taskID platform.ID, now int64) (backend.QueuedRun, error) {
return c.CreateRun(ctx, taskID, now)
}
func (c *Coordinator) FinishRun(ctx context.Context, taskID, runID platform.ID) error {
return c.FinishRun(ctx, taskID, runID)
return c.Store.DeleteTask(ctx, id)
}
func (c *Coordinator) DeleteOrg(ctx context.Context, orgID platform.ID) error {
orgTasks, err := c.st.ListTasks(ctx, backend.TaskSearchParams{
orgTasks, err := c.Store.ListTasks(ctx, backend.TaskSearchParams{
Org: orgID,
})
if err != nil {
@ -169,11 +150,11 @@ func (c *Coordinator) DeleteOrg(ctx context.Context, orgID platform.ID) error {
}
}
return c.st.DeleteOrg(ctx, orgID)
return c.Store.DeleteOrg(ctx, orgID)
}
func (c *Coordinator) DeleteUser(ctx context.Context, userID platform.ID) error {
userTasks, err := c.st.ListTasks(ctx, backend.TaskSearchParams{
userTasks, err := c.Store.ListTasks(ctx, backend.TaskSearchParams{
User: userID,
})
if err != nil {
@ -186,9 +167,5 @@ func (c *Coordinator) DeleteUser(ctx context.Context, userID platform.ID) error
}
}
return c.st.DeleteUser(ctx, userID)
}
func (c *Coordinator) Close() error {
return c.st.Close()
return c.Store.DeleteUser(ctx, userID)
}

View File

@ -126,5 +126,4 @@ func TestCoordinator(t *testing.T) {
if task.Script != newScript {
t.Fatal("task sent to scheduler doesnt match task created")
}
}