mirror of https://github.com/k3s-io/k3s.git
Consolidating imaging pulling logic
parent
804109d349
commit
321c289792
|
@ -27,25 +27,29 @@ import (
|
|||
"k8s.io/kubernetes/pkg/util/flowcontrol"
|
||||
)
|
||||
|
||||
// imagePuller pulls the image using Runtime.PullImage().
|
||||
// It will check the presence of the image, and report the 'image pulling',
|
||||
// 'image pulled' events correspondingly.
|
||||
type parallelImagePuller struct {
|
||||
// imageManager provides the functionalities for image pulling.
|
||||
type imageManager struct {
|
||||
recorder record.EventRecorder
|
||||
runtime kubecontainer.Runtime
|
||||
backOff *flowcontrol.Backoff
|
||||
// It will check the presence of the image, and report the 'image pulling', image pulled' events correspondingly.
|
||||
puller imagePuller
|
||||
}
|
||||
|
||||
// enforce compatibility.
|
||||
var _ imagePuller = ¶llelImagePuller{}
|
||||
var _ ImageManager = &imageManager{}
|
||||
|
||||
// NewImagePuller takes an event recorder and container runtime to create a
|
||||
// image puller that wraps the container runtime's PullImage interface.
|
||||
func newParallelImagePuller(recorder record.EventRecorder, runtime kubecontainer.Runtime, imageBackOff *flowcontrol.Backoff) imagePuller {
|
||||
return ¶llelImagePuller{
|
||||
func NewImageManager(recorder record.EventRecorder, runtime kubecontainer.Runtime, imageBackOff *flowcontrol.Backoff, serialized bool) ImageManager {
|
||||
var puller imagePuller
|
||||
if serialized {
|
||||
puller = newSerialImagePuller(runtime)
|
||||
} else {
|
||||
puller = newParallelImagePuller(runtime)
|
||||
}
|
||||
return &imageManager{
|
||||
recorder: recorder,
|
||||
runtime: runtime,
|
||||
backOff: imageBackOff,
|
||||
puller: puller,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -65,16 +69,16 @@ func shouldPullImage(container *api.Container, imagePresent bool) bool {
|
|||
}
|
||||
|
||||
// records an event using ref, event msg. log to glog using prefix, msg, logFn
|
||||
func (puller *parallelImagePuller) logIt(ref *api.ObjectReference, eventtype, event, prefix, msg string, logFn func(args ...interface{})) {
|
||||
func (m *imageManager) logIt(ref *api.ObjectReference, eventtype, event, prefix, msg string, logFn func(args ...interface{})) {
|
||||
if ref != nil {
|
||||
puller.recorder.Event(ref, eventtype, event, msg)
|
||||
m.recorder.Event(ref, eventtype, event, msg)
|
||||
} else {
|
||||
logFn(fmt.Sprint(prefix, " ", msg))
|
||||
}
|
||||
}
|
||||
|
||||
// PullImage pulls the image for the specified pod and container.
|
||||
func (puller *parallelImagePuller) pullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) {
|
||||
// EnsureImageExists pulls the image for the specified pod and container.
|
||||
func (m *imageManager) EnsureImageExists(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) {
|
||||
logPrefix := fmt.Sprintf("%s/%s", pod.Name, container.Image)
|
||||
ref, err := kubecontainer.GenerateContainerRef(pod, container)
|
||||
if err != nil {
|
||||
|
@ -82,35 +86,37 @@ func (puller *parallelImagePuller) pullImage(pod *api.Pod, container *api.Contai
|
|||
}
|
||||
|
||||
spec := kubecontainer.ImageSpec{Image: container.Image}
|
||||
present, err := puller.runtime.IsImagePresent(spec)
|
||||
present, err := m.runtime.IsImagePresent(spec)
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("Failed to inspect image %q: %v", container.Image, err)
|
||||
puller.logIt(ref, api.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, glog.Warning)
|
||||
m.logIt(ref, api.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, glog.Warning)
|
||||
return ErrImageInspect, msg
|
||||
}
|
||||
|
||||
if !shouldPullImage(container, present) {
|
||||
if present {
|
||||
msg := fmt.Sprintf("Container image %q already present on machine", container.Image)
|
||||
puller.logIt(ref, api.EventTypeNormal, events.PulledImage, logPrefix, msg, glog.Info)
|
||||
m.logIt(ref, api.EventTypeNormal, events.PulledImage, logPrefix, msg, glog.Info)
|
||||
return nil, ""
|
||||
} else {
|
||||
msg := fmt.Sprintf("Container image %q is not present with pull policy of Never", container.Image)
|
||||
puller.logIt(ref, api.EventTypeWarning, events.ErrImageNeverPullPolicy, logPrefix, msg, glog.Warning)
|
||||
m.logIt(ref, api.EventTypeWarning, events.ErrImageNeverPullPolicy, logPrefix, msg, glog.Warning)
|
||||
return ErrImageNeverPull, msg
|
||||
}
|
||||
}
|
||||
|
||||
backOffKey := fmt.Sprintf("%s_%s", pod.UID, container.Image)
|
||||
if puller.backOff.IsInBackOffSinceUpdate(backOffKey, puller.backOff.Clock.Now()) {
|
||||
if m.backOff.IsInBackOffSinceUpdate(backOffKey, m.backOff.Clock.Now()) {
|
||||
msg := fmt.Sprintf("Back-off pulling image %q", container.Image)
|
||||
puller.logIt(ref, api.EventTypeNormal, events.BackOffPullImage, logPrefix, msg, glog.Info)
|
||||
m.logIt(ref, api.EventTypeNormal, events.BackOffPullImage, logPrefix, msg, glog.Info)
|
||||
return ErrImagePullBackOff, msg
|
||||
}
|
||||
puller.logIt(ref, api.EventTypeNormal, events.PullingImage, logPrefix, fmt.Sprintf("pulling image %q", container.Image), glog.Info)
|
||||
if err := puller.runtime.PullImage(spec, pullSecrets); err != nil {
|
||||
puller.logIt(ref, api.EventTypeWarning, events.FailedToPullImage, logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, err), glog.Warning)
|
||||
puller.backOff.Next(backOffKey, puller.backOff.Clock.Now())
|
||||
m.logIt(ref, api.EventTypeNormal, events.PullingImage, logPrefix, fmt.Sprintf("pulling image %q", container.Image), glog.Info)
|
||||
errChan := make(chan error)
|
||||
m.puller.pullImage(spec, pullSecrets, errChan)
|
||||
if err := <-errChan; err != nil {
|
||||
m.logIt(ref, api.EventTypeWarning, events.FailedToPullImage, logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, err), glog.Warning)
|
||||
m.backOff.Next(backOffKey, m.backOff.Clock.Now())
|
||||
if err == RegistryUnavailable {
|
||||
msg := fmt.Sprintf("image pull failed for %s because the registry is unavailable.", container.Image)
|
||||
return err, msg
|
||||
|
@ -118,7 +124,7 @@ func (puller *parallelImagePuller) pullImage(pod *api.Pod, container *api.Contai
|
|||
return ErrImagePull, err.Error()
|
||||
}
|
||||
}
|
||||
puller.logIt(ref, api.EventTypeNormal, events.PulledImage, logPrefix, fmt.Sprintf("Successfully pulled image %q", container.Image), glog.Info)
|
||||
puller.backOff.GC()
|
||||
m.logIt(ref, api.EventTypeNormal, events.PulledImage, logPrefix, fmt.Sprintf("Successfully pulled image %q", container.Image), glog.Info)
|
||||
m.backOff.GC()
|
||||
return nil, ""
|
||||
}
|
|
@ -30,6 +30,97 @@ import (
|
|||
"k8s.io/kubernetes/pkg/util/flowcontrol"
|
||||
)
|
||||
|
||||
func TestParallelPuller(t *testing.T) {
|
||||
pod := &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "test_pod",
|
||||
Namespace: "test-ns",
|
||||
UID: "bar",
|
||||
ResourceVersion: "42",
|
||||
SelfLink: "/api/v1/pods/foo",
|
||||
}}
|
||||
|
||||
cases := []struct {
|
||||
containerImage string
|
||||
policy api.PullPolicy
|
||||
calledFunctions []string
|
||||
inspectErr error
|
||||
pullerErr error
|
||||
expectedErr []error
|
||||
}{
|
||||
{ // pull missing image
|
||||
containerImage: "missing_image",
|
||||
policy: api.PullIfNotPresent,
|
||||
calledFunctions: []string{"IsImagePresent", "PullImage"},
|
||||
inspectErr: nil,
|
||||
pullerErr: nil,
|
||||
expectedErr: []error{nil}},
|
||||
|
||||
{ // image present, dont pull
|
||||
containerImage: "present_image",
|
||||
policy: api.PullIfNotPresent,
|
||||
calledFunctions: []string{"IsImagePresent"},
|
||||
inspectErr: nil,
|
||||
pullerErr: nil,
|
||||
expectedErr: []error{nil, nil, nil}},
|
||||
// image present, pull it
|
||||
{containerImage: "present_image",
|
||||
policy: api.PullAlways,
|
||||
calledFunctions: []string{"IsImagePresent", "PullImage"},
|
||||
inspectErr: nil,
|
||||
pullerErr: nil,
|
||||
expectedErr: []error{nil, nil, nil}},
|
||||
// missing image, error PullNever
|
||||
{containerImage: "missing_image",
|
||||
policy: api.PullNever,
|
||||
calledFunctions: []string{"IsImagePresent"},
|
||||
inspectErr: nil,
|
||||
pullerErr: nil,
|
||||
expectedErr: []error{ErrImageNeverPull, ErrImageNeverPull, ErrImageNeverPull}},
|
||||
// missing image, unable to inspect
|
||||
{containerImage: "missing_image",
|
||||
policy: api.PullIfNotPresent,
|
||||
calledFunctions: []string{"IsImagePresent"},
|
||||
inspectErr: errors.New("unknown inspectError"),
|
||||
pullerErr: nil,
|
||||
expectedErr: []error{ErrImageInspect, ErrImageInspect, ErrImageInspect}},
|
||||
// missing image, unable to fetch
|
||||
{containerImage: "typo_image",
|
||||
policy: api.PullIfNotPresent,
|
||||
calledFunctions: []string{"IsImagePresent", "PullImage"},
|
||||
inspectErr: nil,
|
||||
pullerErr: errors.New("404"),
|
||||
expectedErr: []error{ErrImagePull, ErrImagePull, ErrImagePullBackOff, ErrImagePull, ErrImagePullBackOff, ErrImagePullBackOff}},
|
||||
}
|
||||
|
||||
for i, c := range cases {
|
||||
container := &api.Container{
|
||||
Name: "container_name",
|
||||
Image: c.containerImage,
|
||||
ImagePullPolicy: c.policy,
|
||||
}
|
||||
|
||||
backOff := flowcontrol.NewBackOff(time.Second, time.Minute)
|
||||
fakeClock := clock.NewFakeClock(time.Now())
|
||||
backOff.Clock = fakeClock
|
||||
|
||||
fakeRuntime := &ctest.FakeRuntime{}
|
||||
fakeRecorder := &record.FakeRecorder{}
|
||||
puller := NewImageManager(fakeRecorder, fakeRuntime, backOff, false)
|
||||
|
||||
fakeRuntime.ImageList = []Image{{"present_image", nil, nil, 1}}
|
||||
fakeRuntime.Err = c.pullerErr
|
||||
fakeRuntime.InspectErr = c.inspectErr
|
||||
|
||||
for tick, expected := range c.expectedErr {
|
||||
fakeClock.Step(time.Second)
|
||||
err, _ := puller.EnsureImageExists(pod, container, nil)
|
||||
fakeRuntime.AssertCalls(c.calledFunctions)
|
||||
assert.Equal(t, expected, err, "in test %d tick=%d", i, tick)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSerializedPuller(t *testing.T) {
|
||||
pod := &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
|
@ -106,7 +197,7 @@ func TestSerializedPuller(t *testing.T) {
|
|||
|
||||
fakeRuntime := &ctest.FakeRuntime{}
|
||||
fakeRecorder := &record.FakeRecorder{}
|
||||
puller := newSerializedImagePuller(fakeRecorder, fakeRuntime, backOff)
|
||||
puller := NewImageManager(fakeRecorder, fakeRuntime, backOff, true)
|
||||
|
||||
fakeRuntime.ImageList = []Image{{"present_image", nil, nil, 0}}
|
||||
fakeRuntime.Err = c.pullerErr
|
||||
|
@ -114,10 +205,9 @@ func TestSerializedPuller(t *testing.T) {
|
|||
|
||||
for tick, expected := range c.expectedErr {
|
||||
fakeClock.Step(time.Second)
|
||||
err, _ := puller.pullImage(pod, container, nil)
|
||||
err, _ := puller.EnsureImageExists(pod, container, nil)
|
||||
fakeRuntime.AssertCalls(c.calledFunctions)
|
||||
assert.Equal(t, expected, err, "in test %d tick=%d", i, tick)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -1,52 +0,0 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package images
|
||||
|
||||
import (
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/util/flowcontrol"
|
||||
)
|
||||
|
||||
type imageManager struct {
|
||||
recorder record.EventRecorder
|
||||
runtime kubecontainer.Runtime
|
||||
backOff *flowcontrol.Backoff
|
||||
imagePuller imagePuller
|
||||
}
|
||||
|
||||
var _ ImageManager = &imageManager{}
|
||||
|
||||
func NewImageManager(recorder record.EventRecorder, runtime kubecontainer.Runtime, imageBackOff *flowcontrol.Backoff, serialized bool) ImageManager {
|
||||
var imagePuller imagePuller
|
||||
if serialized {
|
||||
imagePuller = newSerializedImagePuller(recorder, runtime, imageBackOff)
|
||||
} else {
|
||||
imagePuller = newParallelImagePuller(recorder, runtime, imageBackOff)
|
||||
}
|
||||
return &imageManager{
|
||||
recorder: recorder,
|
||||
runtime: runtime,
|
||||
backOff: imageBackOff,
|
||||
imagePuller: imagePuller,
|
||||
}
|
||||
}
|
||||
|
||||
func (im *imageManager) EnsureImageExists(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) {
|
||||
return im.imagePuller.pullImage(pod, container, pullSecrets)
|
||||
}
|
|
@ -1,123 +0,0 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package images
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
. "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
ctest "k8s.io/kubernetes/pkg/kubelet/container/testing"
|
||||
"k8s.io/kubernetes/pkg/util/clock"
|
||||
"k8s.io/kubernetes/pkg/util/flowcontrol"
|
||||
)
|
||||
|
||||
func TestPuller(t *testing.T) {
|
||||
pod := &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "test_pod",
|
||||
Namespace: "test-ns",
|
||||
UID: "bar",
|
||||
ResourceVersion: "42",
|
||||
SelfLink: "/api/v1/pods/foo",
|
||||
}}
|
||||
|
||||
cases := []struct {
|
||||
containerImage string
|
||||
policy api.PullPolicy
|
||||
calledFunctions []string
|
||||
inspectErr error
|
||||
pullerErr error
|
||||
expectedErr []error
|
||||
}{
|
||||
{ // pull missing image
|
||||
containerImage: "missing_image",
|
||||
policy: api.PullIfNotPresent,
|
||||
calledFunctions: []string{"IsImagePresent", "PullImage"},
|
||||
inspectErr: nil,
|
||||
pullerErr: nil,
|
||||
expectedErr: []error{nil}},
|
||||
|
||||
{ // image present, don't pull
|
||||
containerImage: "present_image",
|
||||
policy: api.PullIfNotPresent,
|
||||
calledFunctions: []string{"IsImagePresent"},
|
||||
inspectErr: nil,
|
||||
pullerErr: nil,
|
||||
expectedErr: []error{nil, nil, nil}},
|
||||
// image present, pull it
|
||||
{containerImage: "present_image",
|
||||
policy: api.PullAlways,
|
||||
calledFunctions: []string{"IsImagePresent", "PullImage"},
|
||||
inspectErr: nil,
|
||||
pullerErr: nil,
|
||||
expectedErr: []error{nil, nil, nil}},
|
||||
// missing image, error PullNever
|
||||
{containerImage: "missing_image",
|
||||
policy: api.PullNever,
|
||||
calledFunctions: []string{"IsImagePresent"},
|
||||
inspectErr: nil,
|
||||
pullerErr: nil,
|
||||
expectedErr: []error{ErrImageNeverPull, ErrImageNeverPull, ErrImageNeverPull}},
|
||||
// missing image, unable to inspect
|
||||
{containerImage: "missing_image",
|
||||
policy: api.PullIfNotPresent,
|
||||
calledFunctions: []string{"IsImagePresent"},
|
||||
inspectErr: errors.New("unknown inspectError"),
|
||||
pullerErr: nil,
|
||||
expectedErr: []error{ErrImageInspect, ErrImageInspect, ErrImageInspect}},
|
||||
// missing image, unable to fetch
|
||||
{containerImage: "typo_image",
|
||||
policy: api.PullIfNotPresent,
|
||||
calledFunctions: []string{"IsImagePresent", "PullImage"},
|
||||
inspectErr: nil,
|
||||
pullerErr: errors.New("404"),
|
||||
expectedErr: []error{ErrImagePull, ErrImagePull, ErrImagePullBackOff, ErrImagePull, ErrImagePullBackOff, ErrImagePullBackOff}},
|
||||
}
|
||||
|
||||
for i, c := range cases {
|
||||
container := &api.Container{
|
||||
Name: "container_name",
|
||||
Image: c.containerImage,
|
||||
ImagePullPolicy: c.policy,
|
||||
}
|
||||
|
||||
backOff := flowcontrol.NewBackOff(time.Second, time.Minute)
|
||||
fakeClock := clock.NewFakeClock(time.Now())
|
||||
backOff.Clock = fakeClock
|
||||
|
||||
fakeRuntime := &ctest.FakeRuntime{}
|
||||
fakeRecorder := &record.FakeRecorder{}
|
||||
puller := newParallelImagePuller(fakeRecorder, fakeRuntime, backOff)
|
||||
|
||||
fakeRuntime.ImageList = []Image{{"present_image", nil, nil, 1}}
|
||||
fakeRuntime.Err = c.pullerErr
|
||||
fakeRuntime.InspectErr = c.inspectErr
|
||||
|
||||
for tick, expected := range c.expectedErr {
|
||||
fakeClock.Step(time.Second)
|
||||
err, _ := puller.pullImage(pod, container, nil)
|
||||
fakeRuntime.AssertCalls(c.calledFunctions)
|
||||
assert.Equal(t, expected, err, "in test %d tick=%d", i, tick)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,79 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package images
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
)
|
||||
|
||||
type imagePuller interface {
|
||||
pullImage(kubecontainer.ImageSpec, []api.Secret, chan<- error)
|
||||
}
|
||||
|
||||
var _, _ imagePuller = ¶llelImagePuller{}, &serialImagePuller{}
|
||||
|
||||
type parallelImagePuller struct {
|
||||
runtime kubecontainer.Runtime
|
||||
}
|
||||
|
||||
func newParallelImagePuller(runtime kubecontainer.Runtime) imagePuller {
|
||||
return ¶llelImagePuller{runtime}
|
||||
}
|
||||
|
||||
func (pip *parallelImagePuller) pullImage(spec kubecontainer.ImageSpec, pullSecrets []api.Secret, errChan chan<- error) {
|
||||
go func() {
|
||||
errChan <- pip.runtime.PullImage(spec, pullSecrets)
|
||||
}()
|
||||
}
|
||||
|
||||
// Maximum number of image pull requests than can be queued.
|
||||
const maxImagePullRequests = 10
|
||||
|
||||
type serialImagePuller struct {
|
||||
runtime kubecontainer.Runtime
|
||||
pullRequests chan *imagePullRequest
|
||||
}
|
||||
|
||||
func newSerialImagePuller(runtime kubecontainer.Runtime) imagePuller {
|
||||
imagePuller := &serialImagePuller{runtime, make(chan *imagePullRequest, maxImagePullRequests)}
|
||||
go wait.Until(imagePuller.processImagePullRequests, time.Second, wait.NeverStop)
|
||||
return imagePuller
|
||||
}
|
||||
|
||||
type imagePullRequest struct {
|
||||
spec kubecontainer.ImageSpec
|
||||
pullSecrets []api.Secret
|
||||
errChan chan<- error
|
||||
}
|
||||
|
||||
func (sip *serialImagePuller) pullImage(spec kubecontainer.ImageSpec, pullSecrets []api.Secret, errChan chan<- error) {
|
||||
sip.pullRequests <- &imagePullRequest{
|
||||
spec: spec,
|
||||
pullSecrets: pullSecrets,
|
||||
errChan: errChan,
|
||||
}
|
||||
}
|
||||
|
||||
func (sip *serialImagePuller) processImagePullRequests() {
|
||||
for pullRequest := range sip.pullRequests {
|
||||
pullRequest.errChan <- sip.runtime.PullImage(pullRequest.spec, pullRequest.pullSecrets)
|
||||
}
|
||||
}
|
|
@ -1,143 +0,0 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package images
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/events"
|
||||
"k8s.io/kubernetes/pkg/util/flowcontrol"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
)
|
||||
|
||||
type imagePullRequest struct {
|
||||
spec kubecontainer.ImageSpec
|
||||
container *api.Container
|
||||
pullSecrets []api.Secret
|
||||
logPrefix string
|
||||
ref *api.ObjectReference
|
||||
returnChan chan<- error
|
||||
}
|
||||
|
||||
// serializedImagePuller pulls the image using Runtime.PullImage().
|
||||
// It will check the presence of the image, and report the 'image pulling',
|
||||
// 'image pulled' events correspondingly.
|
||||
type serializedImagePuller struct {
|
||||
recorder record.EventRecorder
|
||||
runtime kubecontainer.Runtime
|
||||
backOff *flowcontrol.Backoff
|
||||
pullRequests chan *imagePullRequest
|
||||
}
|
||||
|
||||
// enforce compatibility.
|
||||
var _ imagePuller = &serializedImagePuller{}
|
||||
|
||||
// NewSerializedImagePuller takes an event recorder and container runtime to create a
|
||||
// image puller that wraps the container runtime's PullImage interface.
|
||||
// Pulls one image at a time.
|
||||
// Issue #10959 has the rationale behind serializing image pulls.
|
||||
func newSerializedImagePuller(recorder record.EventRecorder, runtime kubecontainer.Runtime, imageBackOff *flowcontrol.Backoff) imagePuller {
|
||||
imagePuller := &serializedImagePuller{
|
||||
recorder: recorder,
|
||||
runtime: runtime,
|
||||
backOff: imageBackOff,
|
||||
pullRequests: make(chan *imagePullRequest, 10),
|
||||
}
|
||||
go wait.Until(imagePuller.pullImages, time.Second, wait.NeverStop)
|
||||
return imagePuller
|
||||
}
|
||||
|
||||
// records an event using ref, event msg. log to glog using prefix, msg, logFn
|
||||
func (puller *serializedImagePuller) logIt(ref *api.ObjectReference, eventtype, event, prefix, msg string, logFn func(args ...interface{})) {
|
||||
if ref != nil {
|
||||
puller.recorder.Event(ref, eventtype, event, msg)
|
||||
} else {
|
||||
logFn(fmt.Sprint(prefix, " ", msg))
|
||||
}
|
||||
}
|
||||
|
||||
// PullImage pulls the image for the specified pod and container.
|
||||
func (puller *serializedImagePuller) pullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) {
|
||||
logPrefix := fmt.Sprintf("%s/%s", pod.Name, container.Image)
|
||||
ref, err := kubecontainer.GenerateContainerRef(pod, container)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
|
||||
}
|
||||
|
||||
spec := kubecontainer.ImageSpec{Image: container.Image}
|
||||
present, err := puller.runtime.IsImagePresent(spec)
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("Failed to inspect image %q: %v", container.Image, err)
|
||||
puller.logIt(ref, api.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, glog.Warning)
|
||||
return ErrImageInspect, msg
|
||||
}
|
||||
|
||||
if !shouldPullImage(container, present) {
|
||||
if present {
|
||||
msg := fmt.Sprintf("Container image %q already present on machine", container.Image)
|
||||
puller.logIt(ref, api.EventTypeNormal, events.PulledImage, logPrefix, msg, glog.Info)
|
||||
return nil, ""
|
||||
} else {
|
||||
msg := fmt.Sprintf("Container image %q is not present with pull policy of Never", container.Image)
|
||||
puller.logIt(ref, api.EventTypeWarning, events.ErrImageNeverPullPolicy, logPrefix, msg, glog.Warning)
|
||||
return ErrImageNeverPull, msg
|
||||
}
|
||||
}
|
||||
|
||||
backOffKey := fmt.Sprintf("%s_%s", pod.UID, container.Image)
|
||||
if puller.backOff.IsInBackOffSinceUpdate(backOffKey, puller.backOff.Clock.Now()) {
|
||||
msg := fmt.Sprintf("Back-off pulling image %q", container.Image)
|
||||
puller.logIt(ref, api.EventTypeNormal, events.BackOffPullImage, logPrefix, msg, glog.Info)
|
||||
return ErrImagePullBackOff, msg
|
||||
}
|
||||
|
||||
// enqueue image pull request and wait for response.
|
||||
returnChan := make(chan error)
|
||||
puller.pullRequests <- &imagePullRequest{
|
||||
spec: spec,
|
||||
container: container,
|
||||
pullSecrets: pullSecrets,
|
||||
logPrefix: logPrefix,
|
||||
ref: ref,
|
||||
returnChan: returnChan,
|
||||
}
|
||||
if err = <-returnChan; err != nil {
|
||||
puller.logIt(ref, api.EventTypeWarning, events.FailedToPullImage, logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, err), glog.Warning)
|
||||
puller.backOff.Next(backOffKey, puller.backOff.Clock.Now())
|
||||
if err == RegistryUnavailable {
|
||||
msg := fmt.Sprintf("image pull failed for %s because the registry is unavailable.", container.Image)
|
||||
return err, msg
|
||||
} else {
|
||||
return ErrImagePull, err.Error()
|
||||
}
|
||||
}
|
||||
puller.logIt(ref, api.EventTypeNormal, events.PulledImage, logPrefix, fmt.Sprintf("Successfully pulled image %q", container.Image), glog.Info)
|
||||
puller.backOff.GC()
|
||||
return nil, ""
|
||||
}
|
||||
|
||||
func (puller *serializedImagePuller) pullImages() {
|
||||
for pullRequest := range puller.pullRequests {
|
||||
puller.logIt(pullRequest.ref, api.EventTypeNormal, events.PullingImage, pullRequest.logPrefix, fmt.Sprintf("pulling image %q", pullRequest.container.Image), glog.Info)
|
||||
pullRequest.returnChan <- puller.runtime.PullImage(pullRequest.spec, pullRequest.pullSecrets)
|
||||
}
|
||||
}
|
|
@ -50,10 +50,3 @@ type ImageManager interface {
|
|||
|
||||
// TODO(ronl): consolidating image managing and deleting operation in this interface
|
||||
}
|
||||
|
||||
// ImagePuller wraps Runtime.PullImage() to pull a container image.
|
||||
// It will check the presence of the image, and report the 'image pulling',
|
||||
// 'image pulled' events correspondingly.
|
||||
type imagePuller interface {
|
||||
pullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue