From 110fcc46a6b8e4cb2abf356ac9f99d4769973e7d Mon Sep 17 00:00:00 2001 From: Anthony Lapenna Date: Tue, 6 Nov 2018 22:49:48 +1300 Subject: [PATCH] feat(api): revamp scheduling to introduce system schedules (#2433) * feat(api): revamp scheduling to introduce system schedules * fix(api): fix linting issues * fix(api): fix lint issues * refactor(api): fix lint issues --- api/bolt/schedule/schedule.go | 26 ++++ api/cmd/portainer/main.go | 121 ++++++++++++------ ..._endpoint_sync.go => job_endpoint_sync.go} | 94 ++++++++------ api/cron/job_script_execution.go | 76 +++++++++++ api/cron/job_snapshot.go | 84 ++++++++++++ api/cron/scheduler.go | 74 +++++------ api/cron/task_script.go | 63 --------- api/cron/task_snapshot.go | 61 --------- api/http/handler/schedules/handler.go | 11 -- api/http/handler/schedules/schedule_create.go | 29 +++-- api/http/handler/schedules/schedule_delete.go | 12 +- api/http/handler/schedules/schedule_update.go | 28 ++-- api/http/handler/settings/handler.go | 1 + api/http/handler/settings/settings_update.go | 28 +++- api/http/server.go | 1 + api/portainer.go | 63 +++++++-- 16 files changed, 475 insertions(+), 297 deletions(-) rename api/cron/{task_endpoint_sync.go => job_endpoint_sync.go} (67%) create mode 100644 api/cron/job_script_execution.go create mode 100644 api/cron/job_snapshot.go delete mode 100644 api/cron/task_script.go delete mode 100644 api/cron/task_snapshot.go diff --git a/api/bolt/schedule/schedule.go b/api/bolt/schedule/schedule.go index dab557dce..db824768f 100644 --- a/api/bolt/schedule/schedule.go +++ b/api/bolt/schedule/schedule.go @@ -77,6 +77,32 @@ func (service *Service) Schedules() ([]portainer.Schedule, error) { return schedules, err } +// SchedulesByJobType return a array containing all the schedules +// with the specified JobType. +func (service *Service) SchedulesByJobType(jobType portainer.JobType) ([]portainer.Schedule, error) { + var schedules = make([]portainer.Schedule, 0) + + err := service.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(BucketName)) + + cursor := bucket.Cursor() + for k, v := cursor.First(); k != nil; k, v = cursor.Next() { + var schedule portainer.Schedule + err := internal.UnmarshalObject(v, &schedule) + if err != nil { + return err + } + if schedule.JobType == jobType { + schedules = append(schedules, schedule) + } + } + + return nil + }) + + return schedules, err +} + // CreateSchedule assign an ID to a new schedule and saves it. func (service *Service) CreateSchedule(schedule *portainer.Schedule) error { return service.db.Update(func(tx *bolt.Tx) error { diff --git a/api/cmd/portainer/main.go b/api/cmd/portainer/main.go index 9e344dde1..b0e7a1a5d 100644 --- a/api/cmd/portainer/main.go +++ b/api/cmd/portainer/main.go @@ -110,39 +110,80 @@ func initSnapshotter(clientFactory *docker.ClientFactory) portainer.Snapshotter return docker.NewSnapshotter(clientFactory) } -func initJobScheduler(endpointService portainer.EndpointService, snapshotter portainer.Snapshotter, flags *portainer.CLIFlags) (portainer.JobScheduler, error) { - jobScheduler := cron.NewJobScheduler() +func initJobScheduler() portainer.JobScheduler { + return cron.NewJobScheduler() +} - if *flags.ExternalEndpoints != "" { - log.Println("Using external endpoint definition. Endpoint management via the API will be disabled.") - - endpointSyncTaskContext := &cron.EndpointSyncTaskContext{ - EndpointService: endpointService, - EndpointFilePath: *flags.ExternalEndpoints, - } - endpointSyncTask := cron.NewEndpointSyncTask(endpointSyncTaskContext) - - err := jobScheduler.ScheduleTask("@every "+*flags.SyncInterval, endpointSyncTask) - if err != nil { - return nil, err - } +func loadSnapshotSystemSchedule(jobScheduler portainer.JobScheduler, snapshotter portainer.Snapshotter, scheduleService portainer.ScheduleService, endpointService portainer.EndpointService, flags *portainer.CLIFlags) error { + if !*flags.Snapshot { + return nil } - if *flags.Snapshot { - - endpointSnapshotTaskContext := &cron.SnapshotTaskContext{ - EndpointService: endpointService, - Snapshotter: snapshotter, - } - endpointSnapshotTask := cron.NewSnapshotTask(endpointSnapshotTaskContext) - - err := jobScheduler.ScheduleTask("@every "+*flags.SnapshotInterval, endpointSnapshotTask) - if err != nil { - return nil, err - } + schedules, err := scheduleService.SchedulesByJobType(portainer.SnapshotJobType) + if err != nil { + return err } - return jobScheduler, nil + if len(schedules) != 0 { + return nil + } + + snapshotJob := &portainer.SnapshotJob{} + + snapshotSchedule := &portainer.Schedule{ + ID: portainer.ScheduleID(scheduleService.GetNextIdentifier()), + Name: "system_snapshot", + CronExpression: "@every " + *flags.SnapshotInterval, + JobType: portainer.SnapshotJobType, + SnapshotJob: snapshotJob, + } + + snapshotJobContext := cron.NewSnapshotJobContext(endpointService, snapshotter) + snapshotJobRunner := cron.NewSnapshotJobRunner(snapshotJob, snapshotJobContext) + + err = jobScheduler.CreateSchedule(snapshotSchedule, snapshotJobRunner) + if err != nil { + return err + } + + return scheduleService.CreateSchedule(snapshotSchedule) +} + +func loadEndpointSyncSystemSchedule(jobScheduler portainer.JobScheduler, scheduleService portainer.ScheduleService, endpointService portainer.EndpointService, flags *portainer.CLIFlags) error { + if *flags.ExternalEndpoints == "" { + return nil + } + + log.Println("Using external endpoint definition. Endpoint management via the API will be disabled.") + + schedules, err := scheduleService.SchedulesByJobType(portainer.EndpointSyncJobType) + if err != nil { + return err + } + + if len(schedules) != 0 { + return nil + } + + endpointSyncJob := &portainer.EndpointSyncJob{} + + endointSyncSchedule := &portainer.Schedule{ + ID: portainer.ScheduleID(scheduleService.GetNextIdentifier()), + Name: "system_endpointsync", + CronExpression: "@every " + *flags.SyncInterval, + JobType: portainer.EndpointSyncJobType, + EndpointSyncJob: endpointSyncJob, + } + + endpointSyncJobContext := cron.NewEndpointSyncJobContext(endpointService, *flags.ExternalEndpoints) + endpointSyncJobRunner := cron.NewEndpointSyncJobRunner(endpointSyncJob, endpointSyncJobContext) + + err = jobScheduler.CreateSchedule(endointSyncSchedule, endpointSyncJobRunner) + if err != nil { + return err + } + + return scheduleService.CreateSchedule(endointSyncSchedule) } func loadSchedulesFromDatabase(jobScheduler portainer.JobScheduler, jobService portainer.JobService, scheduleService portainer.ScheduleService, endpointService portainer.EndpointService, fileService portainer.FileService) error { @@ -152,16 +193,11 @@ func loadSchedulesFromDatabase(jobScheduler portainer.JobScheduler, jobService p } for _, schedule := range schedules { - taskContext := &cron.ScriptTaskContext{ - JobService: jobService, - EndpointService: endpointService, - FileService: fileService, - ScheduleID: schedule.ID, - TargetEndpoints: schedule.Endpoints, - } - schedule.Task.(cron.ScriptTask).SetContext(taskContext) - err = jobScheduler.ScheduleTask(schedule.CronExpression, schedule.Task) + jobContext := cron.NewScriptExecutionJobContext(jobService, endpointService, fileService) + jobRunner := cron.NewScriptExecutionJobRunner(schedule.ScriptExecutionJob, jobContext) + + err = jobScheduler.CreateSchedule(&schedule, jobRunner) if err != nil { return err } @@ -455,12 +491,19 @@ func main() { snapshotter := initSnapshotter(clientFactory) - jobScheduler, err := initJobScheduler(store.EndpointService, snapshotter, flags) + jobScheduler := initJobScheduler() + + err = loadSchedulesFromDatabase(jobScheduler, jobService, store.ScheduleService, store.EndpointService, fileService) if err != nil { log.Fatal(err) } - err = loadSchedulesFromDatabase(jobScheduler, jobService, store.ScheduleService, store.EndpointService, fileService) + err = loadEndpointSyncSystemSchedule(jobScheduler, store.ScheduleService, store.EndpointService, flags) + if err != nil { + log.Fatal(err) + } + + err = loadSnapshotSystemSchedule(jobScheduler, snapshotter, store.ScheduleService, store.EndpointService, flags) if err != nil { log.Fatal(err) } diff --git a/api/cron/task_endpoint_sync.go b/api/cron/job_endpoint_sync.go similarity index 67% rename from api/cron/task_endpoint_sync.go rename to api/cron/job_endpoint_sync.go index 45b2474e7..d6ff92173 100644 --- a/api/cron/task_endpoint_sync.go +++ b/api/cron/job_endpoint_sync.go @@ -9,48 +9,68 @@ import ( "github.com/portainer/portainer" ) -type ( - // EndpointSyncTask represents a task used to synchronize endpoints - // based on an external file. It can be scheduled. - EndpointSyncTask struct { - context *EndpointSyncTaskContext - } +// EndpointSyncJobRunner is used to run a EndpointSyncJob +type EndpointSyncJobRunner struct { + job *portainer.EndpointSyncJob + context *EndpointSyncJobContext +} - // EndpointSyncTaskContext represents the context required for the execution - // of an EndpointSyncTask. - EndpointSyncTaskContext struct { - EndpointService portainer.EndpointService - EndpointFilePath string - } +// EndpointSyncJobContext represents the context of execution of a EndpointSyncJob +type EndpointSyncJobContext struct { + endpointService portainer.EndpointService + endpointFilePath string +} - synchronization struct { - endpointsToCreate []*portainer.Endpoint - endpointsToUpdate []*portainer.Endpoint - endpointsToDelete []*portainer.Endpoint +// NewEndpointSyncJobContext returns a new context that can be used to execute a EndpointSyncJob +func NewEndpointSyncJobContext(endpointService portainer.EndpointService, endpointFilePath string) *EndpointSyncJobContext { + return &EndpointSyncJobContext{ + endpointService: endpointService, + endpointFilePath: endpointFilePath, } +} - fileEndpoint struct { - Name string `json:"Name"` - URL string `json:"URL"` - TLS bool `json:"TLS,omitempty"` - TLSSkipVerify bool `json:"TLSSkipVerify,omitempty"` - TLSCACert string `json:"TLSCACert,omitempty"` - TLSCert string `json:"TLSCert,omitempty"` - TLSKey string `json:"TLSKey,omitempty"` - } -) - -// NewEndpointSyncTask creates a new EndpointSyncTask using the specified -// context. -func NewEndpointSyncTask(context *EndpointSyncTaskContext) EndpointSyncTask { - return EndpointSyncTask{ +// NewEndpointSyncJobRunner returns a new runner that can be scheduled +func NewEndpointSyncJobRunner(job *portainer.EndpointSyncJob, context *EndpointSyncJobContext) *EndpointSyncJobRunner { + return &EndpointSyncJobRunner{ + job: job, context: context, } } +type synchronization struct { + endpointsToCreate []*portainer.Endpoint + endpointsToUpdate []*portainer.Endpoint + endpointsToDelete []*portainer.Endpoint +} + +type fileEndpoint struct { + Name string `json:"Name"` + URL string `json:"URL"` + TLS bool `json:"TLS,omitempty"` + TLSSkipVerify bool `json:"TLSSkipVerify,omitempty"` + TLSCACert string `json:"TLSCACert,omitempty"` + TLSCert string `json:"TLSCert,omitempty"` + TLSKey string `json:"TLSKey,omitempty"` +} + +// GetScheduleID returns the schedule identifier associated to the runner +func (runner *EndpointSyncJobRunner) GetScheduleID() portainer.ScheduleID { + return runner.job.ScheduleID +} + +// SetScheduleID sets the schedule identifier associated to the runner +func (runner *EndpointSyncJobRunner) SetScheduleID(ID portainer.ScheduleID) { + runner.job.ScheduleID = ID +} + +// GetJobType returns the job type associated to the runner +func (runner *EndpointSyncJobRunner) GetJobType() portainer.JobType { + return portainer.EndpointSyncJobType +} + // Run triggers the execution of the endpoint synchronization process. -func (task EndpointSyncTask) Run() { - data, err := ioutil.ReadFile(task.context.EndpointFilePath) +func (runner *EndpointSyncJobRunner) Run() { + data, err := ioutil.ReadFile(runner.context.endpointFilePath) if endpointSyncError(err) { return } @@ -62,11 +82,11 @@ func (task EndpointSyncTask) Run() { } if len(fileEndpoints) == 0 { - log.Println("background task error (endpoint synchronization). External endpoint source is empty") + log.Println("background job error (endpoint synchronization). External endpoint source is empty") return } - storedEndpoints, err := task.context.EndpointService.Endpoints() + storedEndpoints, err := runner.context.endpointService.Endpoints() if endpointSyncError(err) { return } @@ -75,7 +95,7 @@ func (task EndpointSyncTask) Run() { sync := prepareSyncData(storedEndpoints, convertedFileEndpoints) if sync.requireSync() { - err = task.context.EndpointService.Synchronize(sync.endpointsToCreate, sync.endpointsToUpdate, sync.endpointsToDelete) + err = runner.context.endpointService.Synchronize(sync.endpointsToCreate, sync.endpointsToUpdate, sync.endpointsToDelete) if endpointSyncError(err) { return } @@ -85,7 +105,7 @@ func (task EndpointSyncTask) Run() { func endpointSyncError(err error) bool { if err != nil { - log.Printf("background task error (endpoint synchronization). Unable to synchronize endpoints (err=%s)\n", err) + log.Printf("background job error (endpoint synchronization). Unable to synchronize endpoints (err=%s)\n", err) return true } return false diff --git a/api/cron/job_script_execution.go b/api/cron/job_script_execution.go new file mode 100644 index 000000000..aafbc4892 --- /dev/null +++ b/api/cron/job_script_execution.go @@ -0,0 +1,76 @@ +package cron + +import ( + "log" + + "github.com/portainer/portainer" +) + +// ScriptExecutionJobRunner is used to run a ScriptExecutionJob +type ScriptExecutionJobRunner struct { + job *portainer.ScriptExecutionJob + context *ScriptExecutionJobContext +} + +// ScriptExecutionJobContext represents the context of execution of a ScriptExecutionJob +type ScriptExecutionJobContext struct { + jobService portainer.JobService + endpointService portainer.EndpointService + fileService portainer.FileService +} + +// NewScriptExecutionJobContext returns a new context that can be used to execute a ScriptExecutionJob +func NewScriptExecutionJobContext(jobService portainer.JobService, endpointService portainer.EndpointService, fileService portainer.FileService) *ScriptExecutionJobContext { + return &ScriptExecutionJobContext{ + jobService: jobService, + endpointService: endpointService, + fileService: fileService, + } +} + +// NewScriptExecutionJobRunner returns a new runner that can be scheduled +func NewScriptExecutionJobRunner(job *portainer.ScriptExecutionJob, context *ScriptExecutionJobContext) *ScriptExecutionJobRunner { + return &ScriptExecutionJobRunner{ + job: job, + context: context, + } +} + +// Run triggers the execution of the job. +// It will iterate through all the endpoints specified in the context to +// execute the script associated to the job. +func (runner *ScriptExecutionJobRunner) Run() { + scriptFile, err := runner.context.fileService.GetFileContent(runner.job.ScriptPath) + if err != nil { + log.Printf("scheduled job error (script execution). Unable to retrieve script file (err=%s)\n", err) + return + } + + for _, endpointID := range runner.job.Endpoints { + endpoint, err := runner.context.endpointService.Endpoint(endpointID) + if err != nil { + log.Printf("scheduled job error (script execution). Unable to retrieve information about endpoint (id=%d) (err=%s)\n", endpointID, err) + return + } + + err = runner.context.jobService.Execute(endpoint, "", runner.job.Image, scriptFile) + if err != nil { + log.Printf("scheduled job error (script execution). Unable to execute scrtip (endpoint=%s) (err=%s)\n", endpoint.Name, err) + } + } +} + +// GetScheduleID returns the schedule identifier associated to the runner +func (runner *ScriptExecutionJobRunner) GetScheduleID() portainer.ScheduleID { + return runner.job.ScheduleID +} + +// SetScheduleID sets the schedule identifier associated to the runner +func (runner *ScriptExecutionJobRunner) SetScheduleID(ID portainer.ScheduleID) { + runner.job.ScheduleID = ID +} + +// GetJobType returns the job type associated to the runner +func (runner *ScriptExecutionJobRunner) GetJobType() portainer.JobType { + return portainer.ScriptExecutionJobType +} diff --git a/api/cron/job_snapshot.go b/api/cron/job_snapshot.go new file mode 100644 index 000000000..5918b47aa --- /dev/null +++ b/api/cron/job_snapshot.go @@ -0,0 +1,84 @@ +package cron + +import ( + "log" + + "github.com/portainer/portainer" +) + +// SnapshotJobRunner is used to run a SnapshotJob +type SnapshotJobRunner struct { + job *portainer.SnapshotJob + context *SnapshotJobContext +} + +// SnapshotJobContext represents the context of execution of a SnapshotJob +type SnapshotJobContext struct { + endpointService portainer.EndpointService + snapshotter portainer.Snapshotter +} + +// NewSnapshotJobContext returns a new context that can be used to execute a SnapshotJob +func NewSnapshotJobContext(endpointService portainer.EndpointService, snapshotter portainer.Snapshotter) *SnapshotJobContext { + return &SnapshotJobContext{ + endpointService: endpointService, + snapshotter: snapshotter, + } +} + +// NewSnapshotJobRunner returns a new runner that can be scheduled +func NewSnapshotJobRunner(job *portainer.SnapshotJob, context *SnapshotJobContext) *SnapshotJobRunner { + return &SnapshotJobRunner{ + job: job, + context: context, + } +} + +// GetScheduleID returns the schedule identifier associated to the runner +func (runner *SnapshotJobRunner) GetScheduleID() portainer.ScheduleID { + return runner.job.ScheduleID +} + +// SetScheduleID sets the schedule identifier associated to the runner +func (runner *SnapshotJobRunner) SetScheduleID(ID portainer.ScheduleID) { + runner.job.ScheduleID = ID +} + +// GetJobType returns the job type associated to the runner +func (runner *SnapshotJobRunner) GetJobType() portainer.JobType { + return portainer.EndpointSyncJobType +} + +// Run triggers the execution of the job. +// It will iterate through all the endpoints available in the database to +// create a snapshot of each one of them. +func (runner *SnapshotJobRunner) Run() { + endpoints, err := runner.context.endpointService.Endpoints() + if err != nil { + log.Printf("background job error (endpoint snapshot). Unable to retrieve endpoint list (err=%s)\n", err) + return + } + + for _, endpoint := range endpoints { + if endpoint.Type == portainer.AzureEnvironment { + continue + } + + snapshot, err := runner.context.snapshotter.CreateSnapshot(&endpoint) + endpoint.Status = portainer.EndpointStatusUp + if err != nil { + log.Printf("background job error (endpoint snapshot). Unable to create snapshot (endpoint=%s, URL=%s) (err=%s)\n", endpoint.Name, endpoint.URL, err) + endpoint.Status = portainer.EndpointStatusDown + } + + if snapshot != nil { + endpoint.Snapshots = []portainer.Snapshot{*snapshot} + } + + err = runner.context.endpointService.UpdateEndpoint(endpoint.ID, &endpoint) + if err != nil { + log.Printf("background job error (endpoint snapshot). Unable to update endpoint (endpoint=%s, URL=%s) (err=%s)\n", endpoint.Name, endpoint.URL, err) + return + } + } +} diff --git a/api/cron/scheduler.go b/api/cron/scheduler.go index 6a6dc0159..f329d00bd 100644 --- a/api/cron/scheduler.go +++ b/api/cron/scheduler.go @@ -5,52 +5,49 @@ import ( "github.com/robfig/cron" ) -// JobScheduler represents a service for managing crons. +// JobScheduler represents a service for managing crons type JobScheduler struct { cron *cron.Cron } -// NewJobScheduler initializes a new service. +// NewJobScheduler initializes a new service func NewJobScheduler() *JobScheduler { return &JobScheduler{ cron: cron.New(), } } -// UpdateScheduledTask updates a specific scheduled task by re-creating a new cron -// and adding all the existing jobs. It will then re-schedule the new task -// based on the updatedTask parameter. +// CreateSchedule schedules the execution of a job via a runner +func (scheduler *JobScheduler) CreateSchedule(schedule *portainer.Schedule, runner portainer.JobRunner) error { + runner.SetScheduleID(schedule.ID) + return scheduler.cron.AddJob(schedule.CronExpression, runner) +} + +// UpdateSchedule updates a specific scheduled job by re-creating a new cron +// and adding all the existing jobs. It will then re-schedule the new job +// via the specified JobRunner parameter. // NOTE: the cron library do not support updating schedules directly -// hence the work-around. -func (scheduler *JobScheduler) UpdateScheduledTask(scheduleID portainer.ScheduleID, cronExpression string, updatedTask portainer.Task) error { - jobs := scheduler.cron.Entries() +// hence the work-around +func (scheduler *JobScheduler) UpdateSchedule(schedule *portainer.Schedule, runner portainer.JobRunner) error { + cronEntries := scheduler.cron.Entries() newCron := cron.New() - for _, job := range jobs { + for _, entry := range cronEntries { - switch task := job.Job.(type) { - case ScriptTask: - if task.context.ScheduleID == scheduleID { - err := newCron.AddJob(cronExpression, updatedTask) - if err != nil { - return err - } + if entry.Job.(portainer.JobRunner).GetScheduleID() == schedule.ID { - continue + var jobRunner cron.Job = runner + if entry.Job.(portainer.JobRunner).GetJobType() == portainer.SnapshotJobType { + jobRunner = entry.Job } - case SnapshotTask: - _, ok := updatedTask.(SnapshotTask) - if ok { - err := newCron.AddJob(cronExpression, job.Job) - if err != nil { - return err - } - continue + err := newCron.AddJob(schedule.CronExpression, jobRunner) + if err != nil { + return err } } - newCron.Schedule(job.Schedule, job.Job) + newCron.Schedule(entry.Schedule, entry.Job) } scheduler.cron.Stop() @@ -59,25 +56,21 @@ func (scheduler *JobScheduler) UpdateScheduledTask(scheduleID portainer.Schedule return nil } -// UnscheduleTask remove a schedule by re-creating a new cron +// RemoveSchedule remove a scheduled job by re-creating a new cron // and adding all the existing jobs except for the one specified via scheduleID. // NOTE: the cron library do not support removing schedules directly -// hence the work-around. -func (scheduler *JobScheduler) UnscheduleTask(scheduleID portainer.ScheduleID) { - jobs := scheduler.cron.Entries() - +// hence the work-around +func (scheduler *JobScheduler) RemoveSchedule(scheduleID portainer.ScheduleID) { + cronEntries := scheduler.cron.Entries() newCron := cron.New() - for _, job := range jobs { + for _, entry := range cronEntries { - switch task := job.Job.(type) { - case ScriptTask: - if task.context.ScheduleID == scheduleID { - continue - } + if entry.Job.(portainer.JobRunner).GetScheduleID() == scheduleID { + continue } - newCron.Schedule(job.Schedule, job.Job) + newCron.Schedule(entry.Schedule, entry.Job) } scheduler.cron.Stop() @@ -85,11 +78,6 @@ func (scheduler *JobScheduler) UnscheduleTask(scheduleID portainer.ScheduleID) { scheduler.cron.Start() } -// ScheduleTask adds a new task to be scheduled in the cron. -func (scheduler *JobScheduler) ScheduleTask(cronExpression string, task portainer.Task) error { - return scheduler.cron.AddJob(cronExpression, task) -} - // Start starts the scheduled jobs func (scheduler *JobScheduler) Start() { if len(scheduler.cron.Entries()) > 0 { diff --git a/api/cron/task_script.go b/api/cron/task_script.go deleted file mode 100644 index fcde39ef5..000000000 --- a/api/cron/task_script.go +++ /dev/null @@ -1,63 +0,0 @@ -package cron - -import ( - "log" - - "github.com/portainer/portainer" -) - -// ScriptTaskContext represents the context required for the execution -// of a ScriptTask. -type ScriptTaskContext struct { - JobService portainer.JobService - EndpointService portainer.EndpointService - FileService portainer.FileService - ScheduleID portainer.ScheduleID - TargetEndpoints []portainer.EndpointID -} - -// ScriptTask represents a task used to execute a script inside a privileged -// container. It can be scheduled. -type ScriptTask struct { - Image string - ScriptPath string - context *ScriptTaskContext -} - -// NewScriptTask creates a new ScriptTask using the specified context. -func NewScriptTask(image, scriptPath string, context *ScriptTaskContext) ScriptTask { - return ScriptTask{ - Image: image, - ScriptPath: scriptPath, - context: context, - } -} - -// SetContext can be used to set/override the task context -func (task ScriptTask) SetContext(context *ScriptTaskContext) { - task.context = context -} - -// Run triggers the execution of the task. -// It will iterate through all the endpoints specified in the context to -// execute the script associated to the task. -func (task ScriptTask) Run() { - scriptFile, err := task.context.FileService.GetFileContent(task.ScriptPath) - if err != nil { - log.Printf("scheduled task error (script execution). Unable to retrieve script file (err=%s)\n", err) - return - } - - for _, endpointID := range task.context.TargetEndpoints { - endpoint, err := task.context.EndpointService.Endpoint(endpointID) - if err != nil { - log.Printf("scheduled task error (script execution). Unable to retrieve information about endpoint (id=%d) (err=%s)\n", endpointID, err) - return - } - - err = task.context.JobService.Execute(endpoint, "", task.Image, scriptFile) - if err != nil { - log.Printf("scheduled task error (script execution). Unable to execute scrtip (endpoint=%s) (err=%s)\n", endpoint.Name, err) - } - } -} diff --git a/api/cron/task_snapshot.go b/api/cron/task_snapshot.go deleted file mode 100644 index df73d37e1..000000000 --- a/api/cron/task_snapshot.go +++ /dev/null @@ -1,61 +0,0 @@ -package cron - -import ( - "log" - - "github.com/portainer/portainer" -) - -// SnapshotTaskContext represents the context required for the execution -// of a SnapshotTask. -type SnapshotTaskContext struct { - EndpointService portainer.EndpointService - Snapshotter portainer.Snapshotter -} - -// SnapshotTask represents a task used to create endpoint snapshots. -// It can be scheduled. -type SnapshotTask struct { - context *SnapshotTaskContext -} - -// NewSnapshotTask creates a new ScriptTask using the specified context. -func NewSnapshotTask(context *SnapshotTaskContext) SnapshotTask { - return SnapshotTask{ - context: context, - } -} - -// Run triggers the execution of the task. -// It will iterate through all the endpoints available in the database to -// create a snapshot of each one of them. -func (task SnapshotTask) Run() { - endpoints, err := task.context.EndpointService.Endpoints() - if err != nil { - log.Printf("background task error (endpoint snapshot). Unable to retrieve endpoint list (err=%s)\n", err) - return - } - - for _, endpoint := range endpoints { - if endpoint.Type == portainer.AzureEnvironment { - continue - } - - snapshot, err := task.context.Snapshotter.CreateSnapshot(&endpoint) - endpoint.Status = portainer.EndpointStatusUp - if err != nil { - log.Printf("background task error (endpoint snapshot). Unable to create snapshot (endpoint=%s, URL=%s) (err=%s)\n", endpoint.Name, endpoint.URL, err) - endpoint.Status = portainer.EndpointStatusDown - } - - if snapshot != nil { - endpoint.Snapshots = []portainer.Snapshot{*snapshot} - } - - err = task.context.EndpointService.UpdateEndpoint(endpoint.ID, &endpoint) - if err != nil { - log.Printf("background task error (endpoint snapshot). Unable to update endpoint (endpoint=%s, URL=%s) (err=%s)\n", endpoint.Name, endpoint.URL, err) - return - } - } -} diff --git a/api/http/handler/schedules/handler.go b/api/http/handler/schedules/handler.go index efe8ab6a0..408c8c65c 100644 --- a/api/http/handler/schedules/handler.go +++ b/api/http/handler/schedules/handler.go @@ -6,7 +6,6 @@ import ( "github.com/gorilla/mux" httperror "github.com/portainer/libhttp/error" "github.com/portainer/portainer" - "github.com/portainer/portainer/cron" "github.com/portainer/portainer/http/security" ) @@ -39,13 +38,3 @@ func NewHandler(bouncer *security.RequestBouncer) *Handler { return h } - -func (handler *Handler) createTaskExecutionContext(scheduleID portainer.ScheduleID, endpoints []portainer.EndpointID) *cron.ScriptTaskContext { - return &cron.ScriptTaskContext{ - JobService: handler.JobService, - EndpointService: handler.EndpointService, - FileService: handler.FileService, - ScheduleID: scheduleID, - TargetEndpoints: endpoints, - } -} diff --git a/api/http/handler/schedules/schedule_create.go b/api/http/handler/schedules/schedule_create.go index 88c124f07..56061ff26 100644 --- a/api/http/handler/schedules/schedule_create.go +++ b/api/http/handler/schedules/schedule_create.go @@ -142,20 +142,27 @@ func (handler *Handler) createSchedule(name, image, cronExpression string, endpo return nil, err } - taskContext := handler.createTaskExecutionContext(scheduleIdentifier, endpoints) - task := cron.NewScriptTask(image, scriptPath, taskContext) - - err = handler.JobScheduler.ScheduleTask(cronExpression, task) - if err != nil { - return nil, err + job := &portainer.ScriptExecutionJob{ + Endpoints: endpoints, + Image: image, + ScriptPath: scriptPath, + ScheduleID: scheduleIdentifier, } schedule := &portainer.Schedule{ - ID: scheduleIdentifier, - Name: name, - Endpoints: endpoints, - CronExpression: cronExpression, - Task: task, + ID: scheduleIdentifier, + Name: name, + CronExpression: cronExpression, + JobType: portainer.ScriptExecutionJobType, + ScriptExecutionJob: job, + } + + jobContext := cron.NewScriptExecutionJobContext(handler.JobService, handler.EndpointService, handler.FileService) + jobRunner := cron.NewScriptExecutionJobRunner(job, jobContext) + + err = handler.JobScheduler.CreateSchedule(schedule, jobRunner) + if err != nil { + return nil, err } err = handler.ScheduleService.CreateSchedule(schedule) diff --git a/api/http/handler/schedules/schedule_delete.go b/api/http/handler/schedules/schedule_delete.go index 33526c568..502f7d7e4 100644 --- a/api/http/handler/schedules/schedule_delete.go +++ b/api/http/handler/schedules/schedule_delete.go @@ -1,6 +1,7 @@ package schedules import ( + "errors" "net/http" httperror "github.com/portainer/libhttp/error" @@ -15,7 +16,16 @@ func (handler *Handler) scheduleDelete(w http.ResponseWriter, r *http.Request) * return &httperror.HandlerError{http.StatusBadRequest, "Invalid schedule identifier route variable", err} } - handler.JobScheduler.UnscheduleTask(portainer.ScheduleID(scheduleID)) + schedule, err := handler.ScheduleService.Schedule(portainer.ScheduleID(scheduleID)) + if err == portainer.ErrObjectNotFound { + return &httperror.HandlerError{http.StatusNotFound, "Unable to find a schedule with the specified identifier inside the database", err} + } else if err != nil { + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to find a schedule with the specified identifier inside the database", err} + } + + if schedule.JobType == portainer.SnapshotJobType || schedule.JobType == portainer.EndpointSyncJobType { + return &httperror.HandlerError{http.StatusBadRequest, "Cannot remove system schedules", errors.New("Cannot remove system schedule")} + } scheduleFolder := handler.FileService.GetScheduleFolder(portainer.ScheduleID(scheduleID)) err = handler.FileService.RemoveDirectory(scheduleFolder) diff --git a/api/http/handler/schedules/schedule_update.go b/api/http/handler/schedules/schedule_update.go index 561cabc17..209c47da0 100644 --- a/api/http/handler/schedules/schedule_update.go +++ b/api/http/handler/schedules/schedule_update.go @@ -40,14 +40,14 @@ func (handler *Handler) scheduleUpdate(w http.ResponseWriter, r *http.Request) * return &httperror.HandlerError{http.StatusInternalServerError, "Unable to find a schedule with the specified identifier inside the database", err} } - updateTaskSchedule := updateSchedule(schedule, &payload) - if updateTaskSchedule { - taskContext := handler.createTaskExecutionContext(schedule.ID, schedule.Endpoints) - schedule.Task.(cron.ScriptTask).SetContext(taskContext) + updateJobSchedule := updateSchedule(schedule, &payload) + if updateJobSchedule { - err := handler.JobScheduler.UpdateScheduledTask(schedule.ID, schedule.CronExpression, schedule.Task) + jobContext := cron.NewScriptExecutionJobContext(handler.JobService, handler.EndpointService, handler.FileService) + jobRunner := cron.NewScriptExecutionJobRunner(schedule.ScriptExecutionJob, jobContext) + err := handler.JobScheduler.UpdateSchedule(schedule, jobRunner) if err != nil { - return &httperror.HandlerError{http.StatusInternalServerError, "Unable to update task scheduler", err} + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to update job scheduler", err} } } @@ -60,28 +60,26 @@ func (handler *Handler) scheduleUpdate(w http.ResponseWriter, r *http.Request) * } func updateSchedule(schedule *portainer.Schedule, payload *scheduleUpdatePayload) bool { - updateTaskSchedule := false + updateJobSchedule := false if payload.Name != nil { schedule.Name = *payload.Name } if payload.Endpoints != nil { - schedule.Endpoints = payload.Endpoints - updateTaskSchedule = true + schedule.ScriptExecutionJob.Endpoints = payload.Endpoints + updateJobSchedule = true } if payload.CronExpression != nil { schedule.CronExpression = *payload.CronExpression - updateTaskSchedule = true + updateJobSchedule = true } if payload.Image != nil { - t := schedule.Task.(cron.ScriptTask) - t.Image = *payload.Image - - updateTaskSchedule = true + schedule.ScriptExecutionJob.Image = *payload.Image + updateJobSchedule = true } - return updateTaskSchedule + return updateJobSchedule } diff --git a/api/http/handler/settings/handler.go b/api/http/handler/settings/handler.go index 9440aedd5..d20c388d7 100644 --- a/api/http/handler/settings/handler.go +++ b/api/http/handler/settings/handler.go @@ -16,6 +16,7 @@ type Handler struct { LDAPService portainer.LDAPService FileService portainer.FileService JobScheduler portainer.JobScheduler + ScheduleService portainer.ScheduleService } // NewHandler creates a handler to manage settings operations. diff --git a/api/http/handler/settings/settings_update.go b/api/http/handler/settings/settings_update.go index ad50dd65d..472a34ada 100644 --- a/api/http/handler/settings/settings_update.go +++ b/api/http/handler/settings/settings_update.go @@ -8,7 +8,6 @@ import ( "github.com/portainer/libhttp/request" "github.com/portainer/libhttp/response" "github.com/portainer/portainer" - "github.com/portainer/portainer/cron" "github.com/portainer/portainer/filesystem" ) @@ -78,11 +77,9 @@ func (handler *Handler) settingsUpdate(w http.ResponseWriter, r *http.Request) * } if payload.SnapshotInterval != nil && *payload.SnapshotInterval != settings.SnapshotInterval { - settings.SnapshotInterval = *payload.SnapshotInterval - - err := handler.JobScheduler.UpdateScheduledTask(0, "@every "+*payload.SnapshotInterval, cron.NewSnapshotTask(nil)) + err := handler.updateSnapshotInterval(settings, *payload.SnapshotInterval) if err != nil { - return &httperror.HandlerError{http.StatusInternalServerError, "Unable to update task scheduler", err} + return &httperror.HandlerError{http.StatusInternalServerError, "Unable to update snapshot interval", err} } } @@ -99,6 +96,27 @@ func (handler *Handler) settingsUpdate(w http.ResponseWriter, r *http.Request) * return response.JSON(w, settings) } +func (handler *Handler) updateSnapshotInterval(settings *portainer.Settings, snapshotInterval string) error { + settings.SnapshotInterval = snapshotInterval + + schedules, err := handler.ScheduleService.SchedulesByJobType(portainer.SnapshotJobType) + if err != nil { + return err + } + + if len(schedules) != 0 { + snapshotSchedule := schedules[0] + snapshotSchedule.CronExpression = "@every " + snapshotInterval + + err := handler.JobScheduler.UpdateSchedule(&snapshotSchedule, nil) + if err != nil { + return err + } + } + + return nil +} + func (handler *Handler) updateTLS(settings *portainer.Settings) *httperror.HandlerError { if (settings.LDAPSettings.TLSConfig.TLS || settings.LDAPSettings.StartTLS) && !settings.LDAPSettings.TLSConfig.TLSSkipVerify { caCertPath, _ := handler.FileService.GetPathForTLSFile(filesystem.LDAPStorePath, portainer.TLSFileCA) diff --git a/api/http/server.go b/api/http/server.go index d117471b6..8bcfb6921 100644 --- a/api/http/server.go +++ b/api/http/server.go @@ -146,6 +146,7 @@ func (server *Server) Start() error { settingsHandler.LDAPService = server.LDAPService settingsHandler.FileService = server.FileService settingsHandler.JobScheduler = server.JobScheduler + settingsHandler.ScheduleService = server.ScheduleService var stackHandler = stacks.NewHandler(requestBouncer) stackHandler.FileService = server.FileService diff --git a/api/portainer.go b/api/portainer.go index d378486b2..06a8c1b62 100644 --- a/api/portainer.go +++ b/api/portainer.go @@ -223,13 +223,38 @@ type ( // ScheduleID represents a schedule identifier. ScheduleID int - // Schedule represents a task that is scheduled on one or multiple endpoints. + // JobType represents a job type + JobType int + + // ScriptExecutionJob represents a scheduled job that can execute a script via a privileged container + ScriptExecutionJob struct { + ScheduleID ScheduleID `json:"ScheduleId"` + Endpoints []EndpointID + Image string + ScriptPath string + } + + // SnapshotJob represents a scheduled job that can create endpoint snapshots + SnapshotJob struct { + ScheduleID ScheduleID `json:"ScheduleId"` + } + + // EndpointSyncJob represents a scheduled job that synchronize endpoints based on an external file + EndpointSyncJob struct { + ScheduleID ScheduleID `json:"ScheduleId"` + } + + // Schedule represents a scheduled job. + // It only contains a pointer to one of the JobRunner implementations + // based on the JobType Schedule struct { - ID ScheduleID `json:"Id"` - Name string `json:"Name"` - Endpoints []EndpointID `json:"Endpoints"` - CronExpression string `json:"Schedule"` - Task Task `json:"Task"` + ID ScheduleID `json:"Id"` + Name string + CronExpression string + JobType JobType + ScriptExecutionJob *ScriptExecutionJob + SnapshotJob *SnapshotJob + EndpointSyncJob *EndpointSyncJob } // WebhookID represents a webhook identifier. @@ -568,6 +593,7 @@ type ( ScheduleService interface { Schedule(ID ScheduleID) (*Schedule, error) Schedules() ([]Schedule, error) + SchedulesByJobType(jobType JobType) ([]Schedule, error) CreateSchedule(schedule *Schedule) error UpdateSchedule(ID ScheduleID, schedule *Schedule) error DeleteSchedule(ID ScheduleID) error @@ -639,15 +665,18 @@ type ( // JobScheduler represents a service to run jobs on a periodic basis JobScheduler interface { - ScheduleTask(cronExpression string, task Task) error - UpdateScheduledTask(ID ScheduleID, cronExpression string, updatedTask Task) error - UnscheduleTask(ID ScheduleID) + CreateSchedule(schedule *Schedule, runner JobRunner) error + UpdateSchedule(schedule *Schedule, runner JobRunner) error + RemoveSchedule(ID ScheduleID) Start() } - // Task represents a process that can be scheduled - Task interface { + // JobRunner represents a service that can be used to run a job + JobRunner interface { Run() + GetScheduleID() ScheduleID + SetScheduleID(ID ScheduleID) + GetJobType() JobType } // Snapshotter represents a service used to create endpoint snapshots @@ -808,3 +837,15 @@ const ( // ServiceWebhook is a webhook for restarting a docker service ServiceWebhook ) + +const ( + _ JobType = iota + // ScriptExecutionJobType is a non-system job used to execute a script against a list of + // endpoints via privileged containers + ScriptExecutionJobType + // SnapshotJobType is a system job used to create endpoint snapshots + SnapshotJobType + // EndpointSyncJobType is a system job used to synchronize endpoints from + // an external definition store + EndpointSyncJobType +)