feat(jobs): add the job execution API
* feat(jobs): add job service interface * feat(jobs): create job execution api * style(jobs): remove comment * feat(jobs): add bindings * feat(jobs): validate payload different cases * refactor(jobs): rename endpointJob method * refactor(jobs): return original error * feat(jobs): pull image before creating container * feat(jobs): run jobs with sh * style(jobs): remove comment * refactor(jobs): change error names * feat(jobs): sync pull image * fix(jobs): close image reader after error check * style(jobs): remove comment and add docs * refactor(jobs): inline script command * fix(jobs): handle pul image error * refactor(jobs): handle image pull output * fix(docker): set http client timeout to 100s * fix(client): remove timeout from http clientpull/2391/head
parent
719299d75b
commit
65291c68e9
|
@ -7,13 +7,13 @@ import (
|
|||
|
||||
// TarFileInBuffer will create a tar archive containing a single file named via fileName and using the content
|
||||
// specified in fileContent. Returns the archive as a byte array.
|
||||
func TarFileInBuffer(fileContent []byte, fileName string) ([]byte, error) {
|
||||
func TarFileInBuffer(fileContent []byte, fileName string, mode int64) ([]byte, error) {
|
||||
var buffer bytes.Buffer
|
||||
tarWriter := tar.NewWriter(&buffer)
|
||||
|
||||
header := &tar.Header{
|
||||
Name: fileName,
|
||||
Mode: 0600,
|
||||
Mode: mode,
|
||||
Size: int64(len(fileContent)),
|
||||
}
|
||||
|
||||
|
|
|
@ -383,6 +383,10 @@ func initEndpoint(flags *portainer.CLIFlags, endpointService portainer.EndpointS
|
|||
return createUnsecuredEndpoint(*flags.EndpointURL, endpointService, snapshotter)
|
||||
}
|
||||
|
||||
func initJobService(dockerClientFactory *docker.ClientFactory) portainer.JobService {
|
||||
return docker.NewJobService(dockerClientFactory)
|
||||
}
|
||||
|
||||
func main() {
|
||||
flags := initCLI()
|
||||
|
||||
|
@ -408,6 +412,8 @@ func main() {
|
|||
|
||||
clientFactory := initClientFactory(digitalSignatureService)
|
||||
|
||||
jobService := initJobService(clientFactory)
|
||||
|
||||
snapshotter := initSnapshotter(clientFactory)
|
||||
|
||||
jobScheduler, err := initJobScheduler(store.EndpointService, snapshotter, flags)
|
||||
|
@ -520,6 +526,7 @@ func main() {
|
|||
SSLCert: *flags.SSLCert,
|
||||
SSLKey: *flags.SSLKey,
|
||||
DockerClientFactory: clientFactory,
|
||||
JobService: jobService,
|
||||
}
|
||||
|
||||
log.Printf("Starting Portainer %s on %s", portainer.APIVersion, *flags.Addr)
|
||||
|
|
|
@ -3,7 +3,6 @@ package docker
|
|||
import (
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/docker/docker/client"
|
||||
"github.com/portainer/portainer"
|
||||
|
@ -97,7 +96,6 @@ func httpClient(endpoint *portainer.Endpoint) (*http.Client, error) {
|
|||
}
|
||||
|
||||
return &http.Client{
|
||||
Timeout: time.Second * 10,
|
||||
Transport: transport,
|
||||
}, nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,103 @@
|
|||
package docker
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"strconv"
|
||||
|
||||
"github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/api/types/container"
|
||||
"github.com/docker/docker/api/types/network"
|
||||
"github.com/docker/docker/api/types/strslice"
|
||||
"github.com/docker/docker/client"
|
||||
"github.com/portainer/portainer"
|
||||
"github.com/portainer/portainer/archive"
|
||||
)
|
||||
|
||||
// JobService represnts a service that handles jobs on the host
|
||||
type JobService struct {
|
||||
DockerClientFactory *ClientFactory
|
||||
}
|
||||
|
||||
// NewJobService returns a pointer to a new job service
|
||||
func NewJobService(dockerClientFactory *ClientFactory) *JobService {
|
||||
return &JobService{
|
||||
DockerClientFactory: dockerClientFactory,
|
||||
}
|
||||
}
|
||||
|
||||
// Execute will execute a script on the endpoint host with the supplied image as a container
|
||||
func (service *JobService) Execute(endpoint *portainer.Endpoint, image string, script []byte) error {
|
||||
buffer, err := archive.TarFileInBuffer(script, "script.sh", 0700)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cli, err := service.DockerClientFactory.CreateClient(endpoint)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer cli.Close()
|
||||
|
||||
err = pullImage(cli, image)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
containerConfig := &container.Config{
|
||||
AttachStdin: true,
|
||||
AttachStdout: true,
|
||||
AttachStderr: true,
|
||||
Tty: true,
|
||||
WorkingDir: "/tmp",
|
||||
Image: image,
|
||||
Labels: map[string]string{
|
||||
"io.portainer.job.endpoint": strconv.Itoa(int(endpoint.ID)),
|
||||
},
|
||||
Cmd: strslice.StrSlice([]string{"sh", "/tmp/script.sh"}),
|
||||
}
|
||||
|
||||
hostConfig := &container.HostConfig{
|
||||
Binds: []string{"/:/host", "/etc:/etc:ro", "/usr:/usr:ro", "/run:/run:ro", "/sbin:/sbin:ro", "/var:/var:ro"},
|
||||
NetworkMode: "host",
|
||||
Privileged: true,
|
||||
}
|
||||
|
||||
networkConfig := &network.NetworkingConfig{}
|
||||
|
||||
body, err := cli.ContainerCreate(context.Background(), containerConfig, hostConfig, networkConfig, "")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
copyOptions := types.CopyToContainerOptions{}
|
||||
err = cli.CopyToContainer(context.Background(), body.ID, "/tmp", bytes.NewReader(buffer), copyOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
startOptions := types.ContainerStartOptions{}
|
||||
err = cli.ContainerStart(context.Background(), body.ID, startOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func pullImage(cli *client.Client, image string) error {
|
||||
imageReadCloser, err := cli.ImagePull(context.Background(), image, types.ImagePullOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer imageReadCloser.Close()
|
||||
_, err = io.Copy(ioutil.Discard, imageReadCloser)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,114 @@
|
|||
package endpoints
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net/http"
|
||||
|
||||
"github.com/asaskevich/govalidator"
|
||||
httperror "github.com/portainer/libhttp/error"
|
||||
"github.com/portainer/libhttp/request"
|
||||
"github.com/portainer/libhttp/response"
|
||||
"github.com/portainer/portainer"
|
||||
)
|
||||
|
||||
type endpointJobFromFilePayload struct {
|
||||
Image string
|
||||
File []byte
|
||||
}
|
||||
|
||||
type endpointJobFromFileContentPayload struct {
|
||||
Image string
|
||||
FileContent string
|
||||
}
|
||||
|
||||
func (payload *endpointJobFromFilePayload) Validate(r *http.Request) error {
|
||||
file, _, err := request.RetrieveMultiPartFormFile(r, "File")
|
||||
if err != nil {
|
||||
return portainer.Error("Invalid Script file. Ensure that the file is uploaded correctly")
|
||||
}
|
||||
payload.File = file
|
||||
|
||||
image, err := request.RetrieveMultiPartFormValue(r, "Image", false)
|
||||
if err != nil {
|
||||
return portainer.Error("Invalid image name")
|
||||
}
|
||||
payload.Image = image
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (payload *endpointJobFromFileContentPayload) Validate(r *http.Request) error {
|
||||
if govalidator.IsNull(payload.FileContent) {
|
||||
return portainer.Error("Invalid script file content")
|
||||
}
|
||||
|
||||
if govalidator.IsNull(payload.Image) {
|
||||
return portainer.Error("Invalid image name")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// POST request on /api/endpoints/:id/job?method
|
||||
func (handler *Handler) endpointJob(w http.ResponseWriter, r *http.Request) *httperror.HandlerError {
|
||||
endpointID, err := request.RetrieveNumericRouteVariableValue(r, "id")
|
||||
if err != nil {
|
||||
return &httperror.HandlerError{http.StatusBadRequest, "Invalid endpoint identifier route variable", err}
|
||||
}
|
||||
|
||||
method, err := request.RetrieveQueryParameter(r, "method", false)
|
||||
if err != nil {
|
||||
return &httperror.HandlerError{http.StatusBadRequest, "Invalid query parameter: method", err}
|
||||
}
|
||||
|
||||
endpoint, err := handler.EndpointService.Endpoint(portainer.EndpointID(endpointID))
|
||||
if err == portainer.ErrObjectNotFound {
|
||||
return &httperror.HandlerError{http.StatusNotFound, "Unable to find an endpoint with the specified identifier inside the database", err}
|
||||
} else if err != nil {
|
||||
return &httperror.HandlerError{http.StatusInternalServerError, "Unable to find an endpoint with the specified identifier inside the database", err}
|
||||
}
|
||||
|
||||
err = handler.requestBouncer.EndpointAccess(r, endpoint)
|
||||
if err != nil {
|
||||
return &httperror.HandlerError{http.StatusForbidden, "Permission denied to access endpoint", portainer.ErrEndpointAccessDenied}
|
||||
}
|
||||
|
||||
switch method {
|
||||
case "file":
|
||||
return handler.executeJobFromFile(w, r, endpoint)
|
||||
case "string":
|
||||
return handler.executeJobFromFileContent(w, r, endpoint)
|
||||
}
|
||||
|
||||
return &httperror.HandlerError{http.StatusBadRequest, "Invalid value for query parameter: method. Value must be one of: string or file", errors.New(request.ErrInvalidQueryParameter)}
|
||||
}
|
||||
|
||||
func (handler *Handler) executeJobFromFile(w http.ResponseWriter, r *http.Request, endpoint *portainer.Endpoint) *httperror.HandlerError {
|
||||
payload := &endpointJobFromFilePayload{}
|
||||
err := payload.Validate(r)
|
||||
if err != nil {
|
||||
return &httperror.HandlerError{http.StatusBadRequest, "Invalid request payload", err}
|
||||
}
|
||||
|
||||
err = handler.JobService.Execute(endpoint, payload.Image, payload.File)
|
||||
if err != nil {
|
||||
return &httperror.HandlerError{http.StatusInternalServerError, "Failed executing job", err}
|
||||
}
|
||||
|
||||
return response.Empty(w)
|
||||
}
|
||||
|
||||
func (handler *Handler) executeJobFromFileContent(w http.ResponseWriter, r *http.Request, endpoint *portainer.Endpoint) *httperror.HandlerError {
|
||||
var payload endpointJobFromFileContentPayload
|
||||
err := request.DecodeAndValidateJSONPayload(r, &payload)
|
||||
if err != nil {
|
||||
return &httperror.HandlerError{http.StatusBadRequest, "Invalid request payload", err}
|
||||
}
|
||||
|
||||
err = handler.JobService.Execute(endpoint, payload.Image, []byte(payload.FileContent))
|
||||
if err != nil {
|
||||
return &httperror.HandlerError{http.StatusInternalServerError, "Failed executing job", err}
|
||||
}
|
||||
|
||||
return response.Empty(w)
|
||||
}
|
|
@ -31,6 +31,7 @@ type Handler struct {
|
|||
FileService portainer.FileService
|
||||
ProxyManager *proxy.Manager
|
||||
Snapshotter portainer.Snapshotter
|
||||
JobService portainer.JobService
|
||||
}
|
||||
|
||||
// NewHandler creates a handler to manage endpoint operations.
|
||||
|
@ -59,6 +60,7 @@ func NewHandler(bouncer *security.RequestBouncer, authorizeEndpointManagement bo
|
|||
bouncer.AuthenticatedAccess(httperror.LoggerHandler(h.endpointExtensionAdd))).Methods(http.MethodPost)
|
||||
h.Handle("/endpoints/{id}/extensions/{extensionType}",
|
||||
bouncer.AuthenticatedAccess(httperror.LoggerHandler(h.endpointExtensionRemove))).Methods(http.MethodDelete)
|
||||
|
||||
h.Handle("/endpoints/{id}/job",
|
||||
bouncer.AdministratorAccess(httperror.LoggerHandler(h.endpointJob))).Methods(http.MethodPost)
|
||||
return h
|
||||
}
|
||||
|
|
|
@ -43,7 +43,7 @@ func buildOperation(request *http.Request) error {
|
|||
dockerfileContent = []byte(req.Content)
|
||||
}
|
||||
|
||||
buffer, err := archive.TarFileInBuffer(dockerfileContent, "Dockerfile")
|
||||
buffer, err := archive.TarFileInBuffer(dockerfileContent, "Dockerfile", 0600)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -68,6 +68,7 @@ type Server struct {
|
|||
SSLCert string
|
||||
SSLKey string
|
||||
DockerClientFactory *docker.ClientFactory
|
||||
JobService portainer.JobService
|
||||
}
|
||||
|
||||
// Start starts the HTTP server
|
||||
|
@ -109,6 +110,7 @@ func (server *Server) Start() error {
|
|||
endpointHandler.FileService = server.FileService
|
||||
endpointHandler.ProxyManager = proxyManager
|
||||
endpointHandler.Snapshotter = server.Snapshotter
|
||||
endpointHandler.JobService = server.JobService
|
||||
|
||||
var endpointGroupHandler = endpointgroups.NewHandler(requestBouncer)
|
||||
endpointGroupHandler.EndpointGroupService = server.EndpointGroupService
|
||||
|
|
|
@ -635,6 +635,11 @@ type (
|
|||
Up(stack *Stack, endpoint *Endpoint) error
|
||||
Down(stack *Stack, endpoint *Endpoint) error
|
||||
}
|
||||
|
||||
// JobService represtents a service that manages job execution on hosts
|
||||
JobService interface {
|
||||
Execute(endpoint *Endpoint, image string, script []byte) error
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
Loading…
Reference in New Issue