diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 69318702e1..443d2f744a 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -55,6 +55,7 @@ import ( "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" utilnet "k8s.io/kubernetes/pkg/util/net" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/volume/empty_dir" @@ -964,7 +965,7 @@ func main() { addFlags(pflag.CommandLine) util.InitFlags() - util.ReallyCrash = true + utilruntime.ReallyCrash = true util.InitLogs() defer util.FlushLogs() diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index acc69001a0..43e79a122f 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -61,6 +61,7 @@ import ( "k8s.io/kubernetes/pkg/util/mount" nodeutil "k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/pkg/util/oom" + "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/volume" ) @@ -299,7 +300,7 @@ func Run(s *options.KubeletServer, kcfg *KubeletConfig) error { } } - util.ReallyCrash = s.ReallyCrashForTesting + runtime.ReallyCrash = s.ReallyCrashForTesting rand.Seed(time.Now().UTC().UnixNano()) credentialprovider.SetPreferredDockercfgPath(s.RootDirectory) diff --git a/contrib/mesos/pkg/executor/executor.go b/contrib/mesos/pkg/executor/executor.go index 264a823038..0976324b0c 100644 --- a/contrib/mesos/pkg/executor/executor.go +++ b/contrib/mesos/pkg/executor/executor.go @@ -44,7 +44,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockertools" kruntime "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/util" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" ) const ( @@ -608,7 +608,7 @@ func (k *Executor) doShutdown(driver bindings.ExecutorDriver) { if k.shutdownAlert != nil { func() { - util.HandleCrash() + utilruntime.HandleCrash() k.shutdownAlert() }() } diff --git a/contrib/mesos/pkg/executor/service/kubelet.go b/contrib/mesos/pkg/executor/service/kubelet.go index 191f0e00e7..7e089b372c 100644 --- a/contrib/mesos/pkg/executor/service/kubelet.go +++ b/contrib/mesos/pkg/executor/service/kubelet.go @@ -21,6 +21,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/runtime" ) // executorKubelet decorates the kubelet with a Run function that notifies the @@ -38,7 +39,7 @@ func (kl *executorKubelet) Run(mergedUpdates <-chan kubetypes.PodUpdate) { // When this Run function is called, we close it here. // Otherwise, KubeletExecutorServer.runKubelet will. close(kl.kubeletDone) - util.HandleCrash() + runtime.HandleCrash() log.Infoln("kubelet run terminated") //TODO(jdef) turn down verbosity // important: never return! this is in our contract select {} diff --git a/contrib/mesos/pkg/proc/proc.go b/contrib/mesos/pkg/proc/proc.go index 47c85922b4..a7a827755c 100644 --- a/contrib/mesos/pkg/proc/proc.go +++ b/contrib/mesos/pkg/proc/proc.go @@ -19,7 +19,7 @@ package proc import ( "sync" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/runtime" ) const ( @@ -84,7 +84,7 @@ func stateRun(ps *processState, a *scheduledAction) stateFn { close(a.errCh) // signal that action was scheduled func() { // we don't trust clients of this package - defer util.HandleCrash() + defer runtime.HandleCrash() a.action() }() return stateRun diff --git a/contrib/mesos/pkg/runtime/metrics.go b/contrib/mesos/pkg/runtime/metrics.go index be5232dba2..139c373b79 100644 --- a/contrib/mesos/pkg/runtime/metrics.go +++ b/contrib/mesos/pkg/runtime/metrics.go @@ -20,7 +20,7 @@ import ( "sync" "github.com/prometheus/client_golang/prometheus" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/runtime" ) const ( @@ -42,6 +42,6 @@ var registerMetrics sync.Once func Register() { registerMetrics.Do(func() { prometheus.MustRegister(panicCounter) - util.PanicHandlers = append(util.PanicHandlers, func(interface{}) { panicCounter.Inc() }) + runtime.PanicHandlers = append(runtime.PanicHandlers, func(interface{}) { panicCounter.Inc() }) }) } diff --git a/contrib/mesos/pkg/runtime/util.go b/contrib/mesos/pkg/runtime/util.go index e3064e04f1..64e79206ac 100644 --- a/contrib/mesos/pkg/runtime/util.go +++ b/contrib/mesos/pkg/runtime/util.go @@ -21,7 +21,7 @@ import ( "sync" "time" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/runtime" ) type Signal <-chan struct{} @@ -90,7 +90,7 @@ func After(f func()) Signal { ch := make(chan struct{}) go func() { defer close(ch) - defer util.HandleCrash() + defer runtime.HandleCrash() if f != nil { f() } @@ -111,7 +111,7 @@ func Until(f func(), period time.Duration, stopCh <-chan struct{}) { default: } func() { - defer util.HandleCrash() + defer runtime.HandleCrash() f() }() select { diff --git a/contrib/mesos/pkg/scheduler/components/errorhandler/errorhandler.go b/contrib/mesos/pkg/scheduler/components/errorhandler/errorhandler.go index ebe5a0ea13..af0eaca1b0 100644 --- a/contrib/mesos/pkg/scheduler/components/errorhandler/errorhandler.go +++ b/contrib/mesos/pkg/scheduler/components/errorhandler/errorhandler.go @@ -25,7 +25,7 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/runtime" ) type ErrorHandler interface { @@ -57,7 +57,7 @@ func (k *errorHandler) Error(pod *api.Pod, schedulingErr error) { } log.Infof("Error scheduling %v: %v; retrying", pod.Name, schedulingErr) - defer util.HandleCrash() + defer runtime.HandleCrash() // default upstream scheduler passes pod.Name as binding.PodID ctx := api.WithNamespace(api.NewDefaultContext(), pod.Namespace) diff --git a/contrib/mesos/pkg/service/endpoints_controller.go b/contrib/mesos/pkg/service/endpoints_controller.go index a4c5cd565d..3ad1e0c28b 100644 --- a/contrib/mesos/pkg/service/endpoints_controller.go +++ b/contrib/mesos/pkg/service/endpoints_controller.go @@ -34,6 +34,7 @@ import ( "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/intstr" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/watch" @@ -118,14 +119,14 @@ type endpointController struct { // Runs e; will not return until stopCh is closed. workers determines how many // endpoints will be handled in parallel. func (e *endpointController) Run(workers int, stopCh <-chan struct{}) { - defer util.HandleCrash() + defer utilruntime.HandleCrash() go e.serviceController.Run(stopCh) go e.podController.Run(stopCh) for i := 0; i < workers; i++ { go util.Until(e.worker, time.Second, stopCh) } go func() { - defer util.HandleCrash() + defer utilruntime.HandleCrash() time.Sleep(5 * time.Minute) // give time for our cache to fill e.checkLeftoverEndpoints() }() diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 8f3c7aa898..8f53bd74dd 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -38,10 +38,10 @@ import ( "k8s.io/kubernetes/pkg/apiserver/metrics" "k8s.io/kubernetes/pkg/healthz" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/util" utilerrors "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/flushwriter" utilnet "k8s.io/kubernetes/pkg/util/net" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wsstream" "k8s.io/kubernetes/pkg/version" @@ -343,7 +343,7 @@ func write(statusCode int, gv unversioned.GroupVersion, s runtime.NegotiatedSeri if wsstream.IsWebSocketRequest(req) { r := wsstream.NewReader(out, true) if err := r.Copy(w, req); err != nil { - util.HandleError(fmt.Errorf("error encountered while streaming results via websocket: %v", err)) + utilruntime.HandleError(fmt.Errorf("error encountered while streaming results via websocket: %v", err)) } return } @@ -392,7 +392,7 @@ func errorNegotiated(err error, s runtime.NegotiatedSerializer, gv unversioned.G // errorJSONFatal renders an error to the response, and if codec fails will render plaintext. // Returns the HTTP status code of the error. func errorJSONFatal(err error, codec runtime.Encoder, w http.ResponseWriter) int { - util.HandleError(fmt.Errorf("apiserver was unable to write a JSON response: %v", err)) + utilruntime.HandleError(fmt.Errorf("apiserver was unable to write a JSON response: %v", err)) status := errToAPIStatus(err) code := int(status.Code) output, err := runtime.Encode(codec, status) diff --git a/pkg/apiserver/errors.go b/pkg/apiserver/errors.go index c7c5e9a90f..4f9d16a268 100644 --- a/pkg/apiserver/errors.go +++ b/pkg/apiserver/errors.go @@ -23,7 +23,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/storage" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/runtime" ) // statusError is an object that can be converted into an unversioned.Status @@ -60,7 +60,7 @@ func errToAPIStatus(err error) *unversioned.Status { // by REST storage - these typically indicate programmer // error by not using pkg/api/errors, or unexpected failure // cases. - util.HandleError(fmt.Errorf("apiserver received an error that is not an unversioned.Status: %v", err)) + runtime.HandleError(fmt.Errorf("apiserver received an error that is not an unversioned.Status: %v", err)) return &unversioned.Status{ Status: unversioned.StatusFailure, Code: int32(status), diff --git a/pkg/apiserver/resthandler.go b/pkg/apiserver/resthandler.go index 43674b933e..33a936ad57 100644 --- a/pkg/apiserver/resthandler.go +++ b/pkg/apiserver/resthandler.go @@ -35,6 +35,7 @@ import ( "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/strategicpatch" "github.com/emicklei/go-restful" @@ -906,7 +907,7 @@ func finishRequest(timeout time.Duration, fn resultFunc) (result runtime.Object, panicCh := make(chan interface{}, 1) go func() { // panics don't cross goroutine boundaries, so we have to handle ourselves - defer util.HandleCrash(func(panicReason interface{}) { + defer utilruntime.HandleCrash(func(panicReason interface{}) { // Propagate to parent goroutine panicCh <- panicReason }) diff --git a/pkg/client/cache/expiration_cache.go b/pkg/client/cache/expiration_cache.go index 0881bd0f06..5eb996b66b 100644 --- a/pkg/client/cache/expiration_cache.go +++ b/pkg/client/cache/expiration_cache.go @@ -21,6 +21,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/runtime" ) // ExpirationCache implements the store interface @@ -90,7 +91,7 @@ func (c *ExpirationCache) getOrExpire(key string) (interface{}, bool) { // fails; as long as we only return un-expired entries a // reader doesn't need to wait for the result of the delete. go func() { - defer util.HandleCrash() + defer runtime.HandleCrash() c.cacheStorage.Delete(key) }() return nil, false diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index 3bf215bb71..59eaac779a 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -36,6 +36,7 @@ import ( "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/watch" ) @@ -267,7 +268,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { case io.ErrUnexpectedEOF: glog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err) default: - util.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err)) + utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err)) } // If this is "connection refused" error, it means that most likely apiserver is not responsive. // It doesn't make sense to re-list all objects because most likely we will be able to restart @@ -329,12 +330,12 @@ loop: return apierrs.FromObject(event.Object) } if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a { - util.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a)) + utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a)) continue } meta, err := meta.Accessor(event.Object) if err != nil { - util.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) + utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) continue } newResourceVersion := meta.GetResourceVersion() @@ -349,7 +350,7 @@ loop: // to change this. r.store.Delete(event.Object) default: - util.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) + utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) } *resourceVersion = newResourceVersion r.setLastSyncResourceVersion(newResourceVersion) diff --git a/pkg/client/leaderelection/leaderelection.go b/pkg/client/leaderelection/leaderelection.go index b1125d3131..6c2f52c663 100644 --- a/pkg/client/leaderelection/leaderelection.go +++ b/pkg/client/leaderelection/leaderelection.go @@ -61,6 +61,7 @@ import ( "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" "github.com/golang/glog" @@ -169,7 +170,7 @@ type LeaderElectionRecord struct { // Run starts the leader election loop func (le *LeaderElector) Run() { defer func() { - util.HandleCrash() + runtime.HandleCrash() le.config.Callbacks.OnStoppedLeading() }() le.acquire() diff --git a/pkg/client/record/event.go b/pkg/client/record/event.go index 83875cb839..221a8cc340 100644 --- a/pkg/client/record/event.go +++ b/pkg/client/record/event.go @@ -27,6 +27,7 @@ import ( client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/watch" "github.com/golang/glog" @@ -120,7 +121,7 @@ func recordToSink(sink EventSink, event *api.Event, eventCorrelator *EventCorrel event = &eventCopy result, err := eventCorrelator.EventCorrelate(event) if err != nil { - util.HandleError(err) + utilruntime.HandleError(err) } if result.Skip { return @@ -216,7 +217,7 @@ func (eventBroadcaster *eventBroadcasterImpl) StartLogging(logf func(format stri func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*api.Event)) watch.Interface { watcher := eventBroadcaster.Watch() go func() { - defer util.HandleCrash() + defer utilruntime.HandleCrash() for { watchEvent, open := <-watcher.ResultChan() if !open { @@ -262,7 +263,7 @@ func (recorder *recorderImpl) generateEvent(object runtime.Object, timestamp unv go func() { // NOTE: events should be a non-blocking operation - defer util.HandleCrash() + defer utilruntime.HandleCrash() recorder.Action(watch.Added, event) }() } diff --git a/pkg/client/unversioned/portforward/portforward.go b/pkg/client/unversioned/portforward/portforward.go index 8bde0b9920..78e6695e6f 100644 --- a/pkg/client/unversioned/portforward/portforward.go +++ b/pkg/client/unversioned/portforward/portforward.go @@ -30,8 +30,8 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/kubelet/server/portforward" - "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/httpstream" + "k8s.io/kubernetes/pkg/util/runtime" ) // PortForwarder knows how to listen for local connections and forward them to @@ -165,7 +165,7 @@ func (pf *PortForwarder) forward() error { select { case <-pf.stopChan: case <-pf.streamConn.CloseChan(): - util.HandleError(errors.New("lost connection to pod")) + runtime.HandleError(errors.New("lost connection to pod")) } return nil @@ -199,7 +199,7 @@ func (pf *PortForwarder) listenOnPortAndAddress(port *ForwardedPort, protocol st func (pf *PortForwarder) getListener(protocol string, hostname string, port *ForwardedPort) (net.Listener, error) { listener, err := net.Listen(protocol, fmt.Sprintf("%s:%d", hostname, port.Local)) if err != nil { - util.HandleError(fmt.Errorf("Unable to create listener: Error %s", err)) + runtime.HandleError(fmt.Errorf("Unable to create listener: Error %s", err)) return nil, err } listenerAddress := listener.Addr().String() @@ -223,7 +223,7 @@ func (pf *PortForwarder) waitForConnection(listener net.Listener, port Forwarded if err != nil { // TODO consider using something like https://github.com/hydrogen18/stoppableListener? if !strings.Contains(strings.ToLower(err.Error()), "use of closed network connection") { - util.HandleError(fmt.Errorf("Error accepting connection on port %d: %v", port.Local, err)) + runtime.HandleError(fmt.Errorf("Error accepting connection on port %d: %v", port.Local, err)) } return } @@ -255,7 +255,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) { headers.Set(api.PortForwardRequestIDHeader, strconv.Itoa(requestID)) errorStream, err := pf.streamConn.CreateStream(headers) if err != nil { - util.HandleError(fmt.Errorf("error creating error stream for port %d -> %d: %v", port.Local, port.Remote, err)) + runtime.HandleError(fmt.Errorf("error creating error stream for port %d -> %d: %v", port.Local, port.Remote, err)) return } // we're not writing to this stream @@ -277,7 +277,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) { headers.Set(api.StreamType, api.StreamTypeData) dataStream, err := pf.streamConn.CreateStream(headers) if err != nil { - util.HandleError(fmt.Errorf("error creating forwarding stream for port %d -> %d: %v", port.Local, port.Remote, err)) + runtime.HandleError(fmt.Errorf("error creating forwarding stream for port %d -> %d: %v", port.Local, port.Remote, err)) return } @@ -287,7 +287,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) { go func() { // Copy from the remote side to the local port. if _, err := io.Copy(conn, dataStream); err != nil && !strings.Contains(err.Error(), "use of closed network connection") { - util.HandleError(fmt.Errorf("error copying from remote stream to local connection: %v", err)) + runtime.HandleError(fmt.Errorf("error copying from remote stream to local connection: %v", err)) } // inform the select below that the remote copy is done @@ -300,7 +300,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) { // Copy from the local port to the remote side. if _, err := io.Copy(dataStream, conn); err != nil && !strings.Contains(err.Error(), "use of closed network connection") { - util.HandleError(fmt.Errorf("error copying from local connection to remote stream: %v", err)) + runtime.HandleError(fmt.Errorf("error copying from local connection to remote stream: %v", err)) // break out of the select below without waiting for the other copy to finish close(localError) } @@ -315,7 +315,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) { // always expect something on errorChan (it may be nil) err = <-errorChan if err != nil { - util.HandleError(err) + runtime.HandleError(err) } } @@ -323,7 +323,7 @@ func (pf *PortForwarder) Close() { // stop all listeners for _, l := range pf.listeners { if err := l.Close(); err != nil { - util.HandleError(fmt.Errorf("error closing listener: %v", err)) + runtime.HandleError(fmt.Errorf("error closing listener: %v", err)) } } } diff --git a/pkg/client/unversioned/remotecommand/v2.go b/pkg/client/unversioned/remotecommand/v2.go index 16ae32fab6..67e8637cf8 100644 --- a/pkg/client/unversioned/remotecommand/v2.go +++ b/pkg/client/unversioned/remotecommand/v2.go @@ -24,8 +24,8 @@ import ( "sync" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/httpstream" + "k8s.io/kubernetes/pkg/util/runtime" ) // streamProtocolV2 implements version 2 of the streaming protocol for attach @@ -113,7 +113,7 @@ func (e *streamProtocolV2) stream(conn httpstream.Connection) error { defer once.Do(func() { remoteStdin.Close() }) if _, err := io.Copy(remoteStdin, e.stdin); err != nil { - util.HandleError(err) + runtime.HandleError(err) } }() @@ -133,7 +133,7 @@ func (e *streamProtocolV2) stream(conn httpstream.Connection) error { // this "copy" doesn't actually read anything - it's just here to wait for // the server to close remoteStdin. if _, err := io.Copy(ioutil.Discard, remoteStdin); err != nil { - util.HandleError(err) + runtime.HandleError(err) } }() } @@ -143,7 +143,7 @@ func (e *streamProtocolV2) stream(conn httpstream.Connection) error { go func() { defer wg.Done() if _, err := io.Copy(e.stdout, remoteStdout); err != nil { - util.HandleError(err) + runtime.HandleError(err) } }() } @@ -153,7 +153,7 @@ func (e *streamProtocolV2) stream(conn httpstream.Connection) error { go func() { defer wg.Done() if _, err := io.Copy(e.stderr, remoteStderr); err != nil { - util.HandleError(err) + runtime.HandleError(err) } }() } diff --git a/pkg/controller/daemon/controller.go b/pkg/controller/daemon/controller.go index f71f286cb5..3b4772c53e 100644 --- a/pkg/controller/daemon/controller.go +++ b/pkg/controller/daemon/controller.go @@ -36,6 +36,7 @@ import ( "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/validation/field" "k8s.io/kubernetes/pkg/util/workqueue" @@ -183,7 +184,7 @@ func NewDaemonSetsController(kubeClient clientset.Interface, resyncPeriod contro // Run begins watching and syncing daemon sets. func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) { - defer util.HandleCrash() + defer utilruntime.HandleCrash() glog.Infof("Starting Daemon Sets controller manager") controller.SyncAllPodsWithStore(dsc.kubeClient, dsc.podStore.Store) go dsc.dsController.Run(stopCh) @@ -444,7 +445,7 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) { if err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, &ds.Spec.Template, ds); err != nil { glog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name) dsc.expectations.CreationObserved(dsKey) - util.HandleError(err) + utilruntime.HandleError(err) } }(i) } @@ -459,7 +460,7 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) { if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil { glog.V(2).Infof("Failed deletion, decrementing expectations for set %q/%q", ds.Namespace, ds.Name) dsc.expectations.DeletionObserved(dsKey) - util.HandleError(err) + utilruntime.HandleError(err) } }(i) } diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index f4948dd23d..0ccd0ba4dc 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -40,6 +40,7 @@ import ( utilerrors "k8s.io/kubernetes/pkg/util/errors" labelsutil "k8s.io/kubernetes/pkg/util/labels" podutil "k8s.io/kubernetes/pkg/util/pod" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/watch" ) @@ -185,7 +186,7 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller // Run begins watching and syncing. func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) { - defer util.HandleCrash() + defer utilruntime.HandleCrash() go dc.dController.Run(stopCh) go dc.rcController.Run(stopCh) go dc.podController.Run(stopCh) diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 9e5fb1f722..5e2bda0169 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -33,6 +33,7 @@ import ( "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/watch" @@ -123,14 +124,14 @@ type EndpointController struct { // Runs e; will not return until stopCh is closed. workers determines how many // endpoints will be handled in parallel. func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) { - defer util.HandleCrash() + defer utilruntime.HandleCrash() go e.serviceController.Run(stopCh) go e.podController.Run(stopCh) for i := 0; i < workers; i++ { go util.Until(e.worker, time.Second, stopCh) } go func() { - defer util.HandleCrash() + defer utilruntime.HandleCrash() time.Sleep(5 * time.Minute) // give time for our cache to fill e.checkLeftoverEndpoints() }() diff --git a/pkg/controller/framework/controller.go b/pkg/controller/framework/controller.go index ca182343aa..59c9693e68 100644 --- a/pkg/controller/framework/controller.go +++ b/pkg/controller/framework/controller.go @@ -23,6 +23,7 @@ import ( "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" ) // Config contains all the settings for a Controller. @@ -79,7 +80,7 @@ func New(c *Config) *Controller { // It's an error to call Run more than once. // Run blocks; call via go. func (c *Controller) Run(stopCh <-chan struct{}) { - defer util.HandleCrash() + defer utilruntime.HandleCrash() r := cache.NewReflector( c.config.ListerWatcher, c.config.ObjectType, diff --git a/pkg/controller/gc/gc_controller.go b/pkg/controller/gc/gc_controller.go index d73b211471..b7ff87aedd 100644 --- a/pkg/controller/gc/gc_controller.go +++ b/pkg/controller/gc/gc_controller.go @@ -30,6 +30,7 @@ import ( "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/watch" "github.com/golang/glog" @@ -103,7 +104,7 @@ func (gcc *GCController) gc() { defer wait.Done() if err := gcc.deletePod(namespace, name); err != nil { // ignore not founds - defer util.HandleError(err) + defer utilruntime.HandleError(err) } }(terminatedPods[i].Namespace, terminatedPods[i].Name) } diff --git a/pkg/controller/job/controller.go b/pkg/controller/job/controller.go index 1602f6882e..9ed5037cb7 100644 --- a/pkg/controller/job/controller.go +++ b/pkg/controller/job/controller.go @@ -35,6 +35,7 @@ import ( replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/watch" ) @@ -135,7 +136,7 @@ func NewJobController(kubeClient clientset.Interface, resyncPeriod controller.Re // Run the main goroutine responsible for watching and syncing jobs. func (jm *JobController) Run(workers int, stopCh <-chan struct{}) { - defer util.HandleCrash() + defer utilruntime.HandleCrash() go jm.jobController.Run(stopCh) go jm.podController.Run(stopCh) for i := 0; i < workers; i++ { @@ -349,7 +350,7 @@ func (jm *JobController) syncJob(key string) error { go func(ix int) { defer wait.Done() if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, &job); err != nil { - defer util.HandleError(err) + defer utilruntime.HandleError(err) } }(i) } @@ -469,7 +470,7 @@ func (jm *JobController) manageJob(activePods []*api.Pod, succeeded int, job *ex go func(ix int) { defer wait.Done() if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, job); err != nil { - defer util.HandleError(err) + defer utilruntime.HandleError(err) // Decrement the expected number of deletes because the informer won't observe this deletion jm.expectations.DeletionObserved(jobKey) activeLock.Lock() @@ -514,7 +515,7 @@ func (jm *JobController) manageJob(activePods []*api.Pod, succeeded int, job *ex go func() { defer wait.Done() if err := jm.podControl.CreatePods(job.Namespace, &job.Spec.Template, job); err != nil { - defer util.HandleError(err) + defer utilruntime.HandleError(err) // Decrement the expected number of creates because the informer won't observe this pod jm.expectations.CreationObserved(jobKey) activeLock.Lock() diff --git a/pkg/controller/namespace/namespace_controller.go b/pkg/controller/namespace/namespace_controller.go index 0176f7bdeb..c7ad7c5824 100644 --- a/pkg/controller/namespace/namespace_controller.go +++ b/pkg/controller/namespace/namespace_controller.go @@ -27,7 +27,7 @@ import ( client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/util" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/watch" @@ -70,12 +70,12 @@ func NewNamespaceController(kubeClient client.Interface, versions *unversioned.A glog.V(4).Infof("Content remaining in namespace %s, waiting %d seconds", namespace.Name, t) time.Sleep(time.Duration(t) * time.Second) if err := controller.Requeue(namespace); err != nil { - util.HandleError(err) + utilruntime.HandleError(err) } }() return } - util.HandleError(err) + utilruntime.HandleError(err) } }, UpdateFunc: func(oldObj, newObj interface{}) { @@ -87,12 +87,12 @@ func NewNamespaceController(kubeClient client.Interface, versions *unversioned.A glog.V(4).Infof("Content remaining in namespace %s, waiting %d seconds", namespace.Name, t) time.Sleep(time.Duration(t) * time.Second) if err := controller.Requeue(namespace); err != nil { - util.HandleError(err) + utilruntime.HandleError(err) } }() return } - util.HandleError(err) + utilruntime.HandleError(err) } }, }, diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index 5a426a3319..74fe5b188b 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -38,6 +38,7 @@ import ( "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/watch" @@ -241,7 +242,7 @@ func (nc *NodeController) Run(period time.Duration) { nc.podEvictor.Try(func(value TimedValue) (bool, time.Duration) { remaining, err := nc.deletePods(value.Value) if err != nil { - util.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err)) + utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err)) return false, 0 } @@ -260,7 +261,7 @@ func (nc *NodeController) Run(period time.Duration) { nc.terminationEvictor.Try(func(value TimedValue) (bool, time.Duration) { completed, remaining, err := nc.terminatePods(value.Value, value.AddedAt) if err != nil { - util.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err)) + utilruntime.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err)) return false, 0 } @@ -333,7 +334,7 @@ func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}) { // this can only happen if the Store.KeyFunc has a problem creating // a key for the pod. If it happens once, it will happen again so // don't bother requeuing the pod. - util.HandleError(err) + utilruntime.HandleError(err) return } @@ -366,7 +367,7 @@ func forcefullyDeletePod(c client.Interface, pod *api.Pod) { var zero int64 err := c.Pods(pod.Namespace).Delete(pod.Name, &api.DeleteOptions{GracePeriodSeconds: &zero}) if err != nil { - util.HandleError(err) + utilruntime.HandleError(err) } } @@ -456,7 +457,7 @@ func (nc *NodeController) monitorNodeStatus() error { if readyCondition.Status != api.ConditionTrue && lastReadyCondition.Status == api.ConditionTrue { nc.recordNodeStatusChange(node, "NodeNotReady") if err = nc.markAllPodsNotReady(node.Name); err != nil { - util.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err)) + utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err)) } } diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index 23296c2e92..ddd0ae6518 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -35,6 +35,7 @@ import ( "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/watch" ) @@ -182,7 +183,7 @@ func (rsc *ReplicaSetController) SetEventRecorder(recorder record.EventRecorder) // Run begins watching and syncing. func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { - defer util.HandleCrash() + defer utilruntime.HandleCrash() go rsc.rsController.Run(stopCh) go rsc.podController.Run(stopCh) for i := 0; i < workers; i++ { @@ -360,7 +361,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext // Decrement the expected number of creates because the informer won't observe this pod glog.V(2).Infof("Failed creation, decrementing expectations for replica set %q/%q", rs.Namespace, rs.Name) rsc.expectations.CreationObserved(rsKey) - util.HandleError(err) + utilruntime.HandleError(err) } }() } @@ -388,7 +389,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext // Decrement the expected number of deletes because the informer won't observe this deletion glog.V(2).Infof("Failed deletion, decrementing expectations for replica set %q/%q", rs.Namespace, rs.Name) rsc.expectations.DeletionObserved(rsKey) - util.HandleError(err) + utilruntime.HandleError(err) } }(i) } diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index 22483dffdd..93b62394b7 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -35,6 +35,7 @@ import ( "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/watch" ) @@ -184,7 +185,7 @@ func (rm *ReplicationManager) SetEventRecorder(recorder record.EventRecorder) { // Run begins watching and syncing. func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) { - defer util.HandleCrash() + defer utilruntime.HandleCrash() glog.Infof("Starting RC Manager") controller.SyncAllPodsWithStore(rm.kubeClient, rm.podStore.Store) go rm.rcController.Run(stopCh) @@ -364,7 +365,7 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.Re // Decrement the expected number of creates because the informer won't observe this pod glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name) rm.expectations.CreationObserved(rcKey) - util.HandleError(err) + utilruntime.HandleError(err) } }() } @@ -392,7 +393,7 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.Re // Decrement the expected number of deletes because the informer won't observe this deletion glog.V(2).Infof("Failed deletion, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name) rm.expectations.DeletionObserved(rcKey) - util.HandleError(err) + utilruntime.HandleError(err) } }(i) } diff --git a/pkg/controller/resourcequota/resource_quota_controller.go b/pkg/controller/resourcequota/resource_quota_controller.go index 939d00783a..6dbfa46445 100644 --- a/pkg/controller/resourcequota/resource_quota_controller.go +++ b/pkg/controller/resourcequota/resource_quota_controller.go @@ -29,6 +29,7 @@ import ( "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/watch" ) @@ -153,7 +154,7 @@ func (rq *ResourceQuotaController) worker() { defer rq.queue.Done(key) err := rq.syncHandler(key.(string)) if err != nil { - util.HandleError(err) + utilruntime.HandleError(err) } }() } @@ -161,7 +162,7 @@ func (rq *ResourceQuotaController) worker() { // Run begins quota controller using the specified number of workers func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) { - defer util.HandleCrash() + defer utilruntime.HandleCrash() go rq.rqController.Run(stopCh) go rq.podController.Run(stopCh) for i := 0; i < workers; i++ { diff --git a/pkg/controller/service/servicecontroller.go b/pkg/controller/service/servicecontroller.go index 4f0a0df959..4cc83a549a 100644 --- a/pkg/controller/service/servicecontroller.go +++ b/pkg/controller/service/servicecontroller.go @@ -32,7 +32,7 @@ import ( "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/types" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/runtime" ) const ( @@ -188,7 +188,7 @@ func (s *ServiceController) watchServices(serviceQueue *cache.DeltaFIFO) { time.Sleep(processingRetryInterval) serviceQueue.AddIfNotPresent(deltas) } else if err != nil { - util.HandleError(fmt.Errorf("Failed to process service delta. Not retrying: %v", err)) + runtime.HandleError(fmt.Errorf("Failed to process service delta. Not retrying: %v", err)) } } } diff --git a/pkg/controller/serviceaccount/tokens_controller.go b/pkg/controller/serviceaccount/tokens_controller.go index dce9f4f529..e64c9522bd 100644 --- a/pkg/controller/serviceaccount/tokens_controller.go +++ b/pkg/controller/serviceaccount/tokens_controller.go @@ -31,7 +31,7 @@ import ( "k8s.io/kubernetes/pkg/registry/secret" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/serviceaccount" - "k8s.io/kubernetes/pkg/util" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" @@ -255,7 +255,7 @@ func (e *TokensController) secretDeleted(obj interface{}) { if err := client.RetryOnConflict(RemoveTokenBackoff, func() error { return e.removeSecretReferenceIfNeeded(serviceAccount, secret.Name) }); err != nil { - util.HandleError(err) + utilruntime.HandleError(err) } } diff --git a/pkg/genericapiserver/genericapiserver.go b/pkg/genericapiserver/genericapiserver.go index 538094b215..ac1f125aa5 100644 --- a/pkg/genericapiserver/genericapiserver.go +++ b/pkg/genericapiserver/genericapiserver.go @@ -45,6 +45,7 @@ import ( "k8s.io/kubernetes/pkg/ui" "k8s.io/kubernetes/pkg/util" utilnet "k8s.io/kubernetes/pkg/util/net" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" systemd "github.com/coreos/go-systemd/daemon" @@ -617,7 +618,7 @@ func (s *GenericAPIServer) Run(options *ServerRunOptions) { } go func() { - defer util.HandleCrash() + defer utilruntime.HandleCrash() for { // err == systemd.SdNotifyNoSocket when not running on a systemd system if err := systemd.SdNotify("READY=1\n"); err != nil && err != systemd.SdNotifyNoSocket { diff --git a/pkg/kubelet/cadvisor/cadvisor_linux.go b/pkg/kubelet/cadvisor/cadvisor_linux.go index ea55b722fb..dbc351fd83 100644 --- a/pkg/kubelet/cadvisor/cadvisor_linux.go +++ b/pkg/kubelet/cadvisor/cadvisor_linux.go @@ -34,7 +34,7 @@ import ( cadvisorapiv2 "github.com/google/cadvisor/info/v2" "github.com/google/cadvisor/manager" "github.com/google/cadvisor/utils/sysfs" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/runtime" ) type cadvisorClient struct { @@ -119,7 +119,7 @@ func (cc *cadvisorClient) exportHTTP(port uint) error { // If export failed, retry in the background until we are able to bind. // This allows an existing cAdvisor to be killed before this one registers. go func() { - defer util.HandleCrash() + defer runtime.HandleCrash() err := serv.ListenAndServe() for err != nil { diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index f2fd7d4c49..d4130f028f 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -54,6 +54,7 @@ import ( "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/oom" "k8s.io/kubernetes/pkg/util/procfs" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" utilstrings "k8s.io/kubernetes/pkg/util/strings" ) @@ -1297,7 +1298,7 @@ func (dm *DockerManager) killPodWithSyncResult(pod *api.Pod, runningPod kubecont wg.Add(len(runningPod.Containers)) for _, container := range runningPod.Containers { go func(container *kubecontainer.Container) { - defer util.HandleCrash() + defer utilruntime.HandleCrash() defer wg.Done() var containerSpec *api.Container @@ -1418,7 +1419,7 @@ func (dm *DockerManager) killContainer(containerID kubecontainer.ContainerID, co done := make(chan struct{}) go func() { defer close(done) - defer util.HandleCrash() + defer utilruntime.HandleCrash() if err := dm.runner.Run(containerID, pod, container, container.Lifecycle.PreStop); err != nil { glog.Errorf("preStop hook for container %q failed: %v", name, err) } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 0947aa6aa3..54d5c6f132 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -76,6 +76,7 @@ import ( nodeutil "k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/pkg/util/oom" "k8s.io/kubernetes/pkg/util/procfs" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/selinux" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/validation/field" @@ -1632,7 +1633,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont // Kill pods we can't run. if err := canRunPod(pod); err != nil || pod.DeletionTimestamp != nil { if err := kl.killPod(pod, runningPod); err != nil { - util.HandleError(err) + utilruntime.HandleError(err) } return err } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 715e4e9489..df7a2c686b 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -57,6 +57,7 @@ import ( "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/bandwidth" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/volume" @@ -64,7 +65,7 @@ import ( ) func init() { - util.ReallyCrash = true + utilruntime.ReallyCrash = true } const testKubeletHostname = "127.0.0.1" diff --git a/pkg/kubelet/oom_watcher.go b/pkg/kubelet/oom_watcher.go index 2f9feaddf5..12dd2c48f5 100644 --- a/pkg/kubelet/oom_watcher.go +++ b/pkg/kubelet/oom_watcher.go @@ -24,7 +24,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/kubelet/cadvisor" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/runtime" ) type OOMWatcher interface { @@ -60,7 +60,7 @@ func (ow *realOOMWatcher) Start(ref *api.ObjectReference) error { } go func() { - defer util.HandleCrash() + defer runtime.HandleCrash() for event := range eventChannel.GetChannel() { glog.V(2).Infof("Got sys oom event from cadvisor: %v", event) diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index 36d8178fe5..5b93137d93 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -27,7 +27,7 @@ import ( kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util/queue" "k8s.io/kubernetes/pkg/types" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/runtime" ) // PodWorkers is an abstract interface for testability. @@ -171,7 +171,7 @@ func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateType kube // the status of the pod for the first pod worker sync. See corresponding // comment in syncPod. go func() { - defer util.HandleCrash() + defer runtime.HandleCrash() p.managePodLoop(podUpdates) }() } diff --git a/pkg/kubelet/prober/manager_test.go b/pkg/kubelet/prober/manager_test.go index b66a5891e9..b915aa424c 100644 --- a/pkg/kubelet/prober/manager_test.go +++ b/pkg/kubelet/prober/manager_test.go @@ -27,11 +27,12 @@ import ( "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/probe" "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" ) func init() { - util.ReallyCrash = true + runtime.ReallyCrash = true } var defaultProbe *api.Probe = &api.Probe{ diff --git a/pkg/kubelet/prober/worker.go b/pkg/kubelet/prober/worker.go index 5880ac7c42..60f4c6d674 100644 --- a/pkg/kubelet/prober/worker.go +++ b/pkg/kubelet/prober/worker.go @@ -24,7 +24,7 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/util/format" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/runtime" ) // worker handles the periodic probing of its assigned container. Each worker has a go-routine @@ -120,7 +120,7 @@ probeLoop: // doProbe probes the container once and records the result. // Returns whether the worker should continue. func (w *worker) doProbe() (keepGoing bool) { - defer util.HandleCrash(func(_ interface{}) { keepGoing = true }) + defer runtime.HandleCrash(func(_ interface{}) { keepGoing = true }) status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID) if !ok { diff --git a/pkg/kubelet/prober/worker_test.go b/pkg/kubelet/prober/worker_test.go index cd6b423623..1dd1dc3cff 100644 --- a/pkg/kubelet/prober/worker_test.go +++ b/pkg/kubelet/prober/worker_test.go @@ -32,11 +32,12 @@ import ( "k8s.io/kubernetes/pkg/probe" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/exec" + "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" ) func init() { - util.ReallyCrash = true + runtime.ReallyCrash = true } func TestDoProbe(t *testing.T) { @@ -251,7 +252,7 @@ func TestCleanUp(t *testing.T) { } func TestHandleCrash(t *testing.T) { - util.ReallyCrash = false // Test that we *don't* really crash. + runtime.ReallyCrash = false // Test that we *don't* really crash. m := newTestManager() w := newTestWorker(m, readiness, api.Probe{}) diff --git a/pkg/kubelet/server/server.go b/pkg/kubelet/server/server.go index 0f61d40b42..2d519bac8f 100644 --- a/pkg/kubelet/server/server.go +++ b/pkg/kubelet/server/server.go @@ -49,11 +49,11 @@ import ( "k8s.io/kubernetes/pkg/kubelet/server/stats" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" - "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/flushwriter" "k8s.io/kubernetes/pkg/util/httpstream" "k8s.io/kubernetes/pkg/util/httpstream/spdy" "k8s.io/kubernetes/pkg/util/limitwriter" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wsstream" ) @@ -763,7 +763,7 @@ func ServePortForward(w http.ResponseWriter, req *http.Request, portForwarder Po // negotiated protocol isn't currently used server side, but could be in the future if err != nil { // Handshake writes the error to the client - util.HandleError(err) + utilruntime.HandleError(err) return } @@ -865,7 +865,7 @@ func (h *portForwardStreamHandler) monitorStreamPair(p *portForwardStreamPair, t select { case <-timeout: err := fmt.Errorf("(conn=%p, request=%s) timed out waiting for streams", h.conn, p.requestID) - util.HandleError(err) + utilruntime.HandleError(err) p.printError(err.Error()) case <-p.complete: glog.V(5).Infof("(conn=%p, request=%s) successfully received error and data streams", h.conn, p.requestID) @@ -949,7 +949,7 @@ Loop: } if complete, err := p.add(stream); err != nil { msg := fmt.Sprintf("error processing stream for request %s: %v", requestID, err) - util.HandleError(errors.New(msg)) + utilruntime.HandleError(errors.New(msg)) p.printError(msg) } else if complete { go h.portForward(p) @@ -973,7 +973,7 @@ func (h *portForwardStreamHandler) portForward(p *portForwardStreamPair) { if err != nil { msg := fmt.Errorf("error forwarding port %d to pod %s, uid %v: %v", port, h.pod, h.uid, err) - util.HandleError(msg) + utilruntime.HandleError(msg) fmt.Fprint(p.errorStream, msg.Error()) } } diff --git a/pkg/master/controller.go b/pkg/master/controller.go index 41071cf4b2..ee4672f3ee 100644 --- a/pkg/master/controller.go +++ b/pkg/master/controller.go @@ -34,6 +34,7 @@ import ( "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/intstr" utilnet "k8s.io/kubernetes/pkg/util/net" + "k8s.io/kubernetes/pkg/util/runtime" ) // Controller is the controller manager for the core bootstrap Kubernetes controller @@ -103,7 +104,7 @@ func (c *Controller) RunKubernetesService(ch chan struct{}) { // run, ports and type will be corrected only during // start. if err := c.UpdateKubernetesService(false); err != nil { - util.HandleError(fmt.Errorf("unable to sync kubernetes service: %v", err)) + runtime.HandleError(fmt.Errorf("unable to sync kubernetes service: %v", err)) } }, c.EndpointInterval, ch) } diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index f06210cba3..f4b01feca5 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -29,11 +29,11 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/types" - "k8s.io/kubernetes/pkg/util" utilnet "k8s.io/kubernetes/pkg/util/net" utilerrors "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/iptables" + "k8s.io/kubernetes/pkg/util/runtime" ) type portal struct { @@ -335,7 +335,7 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol glog.V(2).Infof("Proxying for service %q on %s port %d", service, protocol, portNum) go func(service proxy.ServicePortName, proxier *Proxier) { - defer util.HandleCrash() + defer runtime.HandleCrash() atomic.AddInt32(&proxier.numProxyLoops, 1) sock.ProxyLoop(service, si, proxier) atomic.AddInt32(&proxier.numProxyLoops, -1) diff --git a/pkg/proxy/userspace/proxier_test.go b/pkg/proxy/userspace/proxier_test.go index 1d9c8ade3e..34b01a9fc4 100644 --- a/pkg/proxy/userspace/proxier_test.go +++ b/pkg/proxy/userspace/proxier_test.go @@ -31,8 +31,8 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/types" - "k8s.io/kubernetes/pkg/util" ipttest "k8s.io/kubernetes/pkg/util/iptables/testing" + "k8s.io/kubernetes/pkg/util/runtime" ) const ( @@ -87,7 +87,7 @@ var udpServerPort int func init() { // Don't handle panics - util.ReallyCrash = true + runtime.ReallyCrash = true // TCP setup. // TODO: Close() this when fix #19254 diff --git a/pkg/proxy/userspace/proxysocket.go b/pkg/proxy/userspace/proxysocket.go index 0dd2d040d9..97f42cc73e 100644 --- a/pkg/proxy/userspace/proxysocket.go +++ b/pkg/proxy/userspace/proxysocket.go @@ -28,7 +28,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/proxy" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/runtime" ) // Abstraction over TCP/UDP sockets which are proxied. @@ -259,7 +259,7 @@ func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr ne } activeClients.clients[cliAddr.String()] = svrConn go func(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, timeout time.Duration) { - defer util.HandleCrash() + defer runtime.HandleCrash() udp.proxyClient(cliAddr, svrConn, activeClients, timeout) }(cliAddr, svrConn, activeClients, timeout) } diff --git a/pkg/registry/service/ipallocator/controller/repair.go b/pkg/registry/service/ipallocator/controller/repair.go index 2cf212e4c1..c8fc332f32 100644 --- a/pkg/registry/service/ipallocator/controller/repair.go +++ b/pkg/registry/service/ipallocator/controller/repair.go @@ -27,6 +27,7 @@ import ( "k8s.io/kubernetes/pkg/registry/service" "k8s.io/kubernetes/pkg/registry/service/ipallocator" "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" ) @@ -67,7 +68,7 @@ func NewRepair(interval time.Duration, registry service.Registry, network *net.I func (c *Repair) RunUntil(ch chan struct{}) { util.Until(func() { if err := c.RunOnce(); err != nil { - util.HandleError(err) + runtime.HandleError(err) } }, c.interval, ch) } @@ -113,7 +114,7 @@ func (c *Repair) runOnce() error { ip := net.ParseIP(svc.Spec.ClusterIP) if ip == nil { // cluster IP is broken, reallocate - util.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not a valid IP; please recreate", svc.Spec.ClusterIP, svc.Name, svc.Namespace)) + runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not a valid IP; please recreate", svc.Spec.ClusterIP, svc.Name, svc.Namespace)) continue } switch err := r.Allocate(ip); err { @@ -121,11 +122,11 @@ func (c *Repair) runOnce() error { case ipallocator.ErrAllocated: // TODO: send event // cluster IP is broken, reallocate - util.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s was assigned to multiple services; please recreate", ip, svc.Name, svc.Namespace)) + runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s was assigned to multiple services; please recreate", ip, svc.Name, svc.Namespace)) case ipallocator.ErrNotInRange: // TODO: send event // cluster IP is broken, reallocate - util.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not within the service CIDR %s; please recreate", ip, svc.Name, svc.Namespace, c.network)) + runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not within the service CIDR %s; please recreate", ip, svc.Name, svc.Namespace, c.network)) case ipallocator.ErrFull: // TODO: send event return fmt.Errorf("the service CIDR %v is full; you must widen the CIDR in order to create new services", r) diff --git a/pkg/registry/service/portallocator/controller/repair.go b/pkg/registry/service/portallocator/controller/repair.go index d216f6b294..7430a7058f 100644 --- a/pkg/registry/service/portallocator/controller/repair.go +++ b/pkg/registry/service/portallocator/controller/repair.go @@ -27,6 +27,7 @@ import ( "k8s.io/kubernetes/pkg/registry/service/portallocator" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/net" + "k8s.io/kubernetes/pkg/util/runtime" ) // See ipallocator/controller/repair.go; this is a copy for ports. @@ -52,7 +53,7 @@ func NewRepair(interval time.Duration, registry service.Registry, portRange net. func (c *Repair) RunUntil(ch chan struct{}) { util.Until(func() { if err := c.RunOnce(); err != nil { - util.HandleError(err) + runtime.HandleError(err) } }, c.interval, ch) } @@ -107,11 +108,11 @@ func (c *Repair) runOnce() error { case portallocator.ErrAllocated: // TODO: send event // port is broken, reallocate - util.HandleError(fmt.Errorf("the port %d for service %s/%s was assigned to multiple services; please recreate", port, svc.Name, svc.Namespace)) + runtime.HandleError(fmt.Errorf("the port %d for service %s/%s was assigned to multiple services; please recreate", port, svc.Name, svc.Namespace)) case portallocator.ErrNotInRange: // TODO: send event // port is broken, reallocate - util.HandleError(fmt.Errorf("the port %d for service %s/%s is not within the port range %v; please recreate", port, svc.Name, svc.Namespace, c.portRange)) + runtime.HandleError(fmt.Errorf("the port %d for service %s/%s is not within the port range %v; please recreate", port, svc.Name, svc.Namespace, c.portRange)) case portallocator.ErrFull: // TODO: send event return fmt.Errorf("the port range %v is full; you must widen the port range in order to create new services", c.portRange) diff --git a/pkg/storage/etcd/etcd_watcher.go b/pkg/storage/etcd/etcd_watcher.go index 0411afe8e3..40532b42d0 100644 --- a/pkg/storage/etcd/etcd_watcher.go +++ b/pkg/storage/etcd/etcd_watcher.go @@ -27,7 +27,7 @@ import ( "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/storage" etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util" - "k8s.io/kubernetes/pkg/util" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/watch" etcd "github.com/coreos/etcd/client" @@ -145,7 +145,7 @@ func newEtcdWatcher(list bool, include includeFunc, filter storage.FilterFunc, e // etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called // as a goroutine. func (w *etcdWatcher) etcdWatch(ctx context.Context, client etcd.KeysAPI, key string, resourceVersion uint64) { - defer util.HandleCrash() + defer utilruntime.HandleCrash() defer close(w.etcdError) defer close(w.etcdIncoming) @@ -211,7 +211,7 @@ func etcdGetInitialWatchState(ctx context.Context, client etcd.KeysAPI, key stri resp, err := client.Get(ctx, key, &opts) if err != nil { if !etcdutil.IsEtcdNotFound(err) { - util.HandleError(fmt.Errorf("watch was unable to retrieve the current index for the provided key (%q): %v", key, err)) + utilruntime.HandleError(fmt.Errorf("watch was unable to retrieve the current index for the provided key (%q): %v", key, err)) return resourceVersion, err } if etcdError, ok := err.(etcd.Error); ok { @@ -247,7 +247,7 @@ var ( // called as a goroutine. func (w *etcdWatcher) translate() { defer close(w.outgoing) - defer util.HandleCrash() + defer utilruntime.HandleCrash() for { select { @@ -309,7 +309,7 @@ func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) { // ensure resource version is set on the object we load from etcd if w.versioner != nil { if err := w.versioner.UpdateObject(obj, node.Expiration, node.ModifiedIndex); err != nil { - util.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err)) + utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err)) } } @@ -317,7 +317,7 @@ func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) { if w.transform != nil { obj, err = w.transform(obj) if err != nil { - util.HandleError(fmt.Errorf("failure to transform api object %#v: %v", obj, err)) + utilruntime.HandleError(fmt.Errorf("failure to transform api object %#v: %v", obj, err)) return nil, err } } @@ -330,7 +330,7 @@ func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) { func (w *etcdWatcher) sendAdd(res *etcd.Response) { if res.Node == nil { - util.HandleError(fmt.Errorf("unexpected nil node: %#v", res)) + utilruntime.HandleError(fmt.Errorf("unexpected nil node: %#v", res)) return } if w.include != nil && !w.include(res.Node.Key) { @@ -338,7 +338,7 @@ func (w *etcdWatcher) sendAdd(res *etcd.Response) { } obj, err := w.decodeObject(res.Node) if err != nil { - util.HandleError(fmt.Errorf("failure to decode api object: %v\n'%v' from %#v %#v", err, string(res.Node.Value), res, res.Node)) + utilruntime.HandleError(fmt.Errorf("failure to decode api object: %v\n'%v' from %#v %#v", err, string(res.Node.Value), res, res.Node)) // TODO: expose an error through watch.Interface? // Ignore this value. If we stop the watch on a bad value, a client that uses // the resourceVersion to resume will never be able to get past a bad value. @@ -367,7 +367,7 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) { } curObj, err := w.decodeObject(res.Node) if err != nil { - util.HandleError(fmt.Errorf("failure to decode api object: %v\n'%v' from %#v %#v", err, string(res.Node.Value), res, res.Node)) + utilruntime.HandleError(fmt.Errorf("failure to decode api object: %v\n'%v' from %#v %#v", err, string(res.Node.Value), res, res.Node)) // TODO: expose an error through watch.Interface? // Ignore this value. If we stop the watch on a bad value, a client that uses // the resourceVersion to resume will never be able to get past a bad value. @@ -407,7 +407,7 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) { func (w *etcdWatcher) sendDelete(res *etcd.Response) { if res.PrevNode == nil { - util.HandleError(fmt.Errorf("unexpected nil prev node: %#v", res)) + utilruntime.HandleError(fmt.Errorf("unexpected nil prev node: %#v", res)) return } if w.include != nil && !w.include(res.PrevNode.Key) { @@ -422,7 +422,7 @@ func (w *etcdWatcher) sendDelete(res *etcd.Response) { } obj, err := w.decodeObject(&node) if err != nil { - util.HandleError(fmt.Errorf("failure to decode api object: %v\nfrom %#v %#v", err, res, res.Node)) + utilruntime.HandleError(fmt.Errorf("failure to decode api object: %v\nfrom %#v %#v", err, res, res.Node)) // TODO: expose an error through watch.Interface? // Ignore this value. If we stop the watch on a bad value, a client that uses // the resourceVersion to resume will never be able to get past a bad value. @@ -446,7 +446,7 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) { case EtcdDelete, EtcdExpire: w.sendDelete(res) default: - util.HandleError(fmt.Errorf("unknown action: %v", res.Action)) + utilruntime.HandleError(fmt.Errorf("unknown action: %v", res.Action)) } } diff --git a/pkg/util/httpstream/spdy/upgrade.go b/pkg/util/httpstream/spdy/upgrade.go index 4fd2a40521..b7d126a5d8 100644 --- a/pkg/util/httpstream/spdy/upgrade.go +++ b/pkg/util/httpstream/spdy/upgrade.go @@ -21,8 +21,8 @@ import ( "net/http" "strings" - "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/httpstream" + "k8s.io/kubernetes/pkg/util/runtime" ) const HeaderSpdy31 = "SPDY/3.1" @@ -64,13 +64,13 @@ func (u responseUpgrader) UpgradeResponse(w http.ResponseWriter, req *http.Reque conn, _, err := hijacker.Hijack() if err != nil { - util.HandleError(fmt.Errorf("unable to upgrade: error hijacking response: %v", err)) + runtime.HandleError(fmt.Errorf("unable to upgrade: error hijacking response: %v", err)) return nil } spdyConn, err := NewServerConnection(conn, newStreamHandler) if err != nil { - util.HandleError(fmt.Errorf("unable to upgrade: error creating SPDY server connection: %v", err)) + runtime.HandleError(fmt.Errorf("unable to upgrade: error creating SPDY server connection: %v", err)) return nil } diff --git a/pkg/util/runtime/runtime.go b/pkg/util/runtime/runtime.go new file mode 100644 index 0000000000..76d7cb4649 --- /dev/null +++ b/pkg/util/runtime/runtime.go @@ -0,0 +1,78 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package runtime + +import ( + "fmt" + "github.com/golang/glog" + "runtime" +) + +// For testing, bypass HandleCrash. +var ReallyCrash bool + +// PanicHandlers is a list of functions which will be invoked when a panic happens. +var PanicHandlers = []func(interface{}){logPanic} + +//TODO search the public functions +// HandleCrash simply catches a crash and logs an error. Meant to be called via defer. +// Additional context-specific handlers can be provided, and will be called in case of panic +func HandleCrash(additionalHandlers ...func(interface{})) { + if ReallyCrash { + return + } + if r := recover(); r != nil { + for _, fn := range PanicHandlers { + fn(r) + } + for _, fn := range additionalHandlers { + fn(r) + } + } +} + +// logPanic logs the caller tree when a panic occurs. +func logPanic(r interface{}) { + callers := "" + for i := 0; true; i++ { + _, file, line, ok := runtime.Caller(i) + if !ok { + break + } + callers = callers + fmt.Sprintf("%v:%v\n", file, line) + } + glog.Errorf("Recovered from panic: %#v (%v)\n%v", r, r, callers) +} + +// ErrorHandlers is a list of functions which will be invoked when an unreturnable +// error occurs. +var ErrorHandlers = []func(error){logError} + +// HandlerError is a method to invoke when a non-user facing piece of code cannot +// return an error and needs to indicate it has been ignored. Invoking this method +// is preferable to logging the error - the default behavior is to log but the +// errors may be sent to a remote server for analysis. +func HandleError(err error) { + for _, fn := range ErrorHandlers { + fn(err) + } +} + +// logError prints an error with the call stack of the location it was reported +func logError(err error) { + glog.ErrorDepth(2, err) +} diff --git a/pkg/util/runtime/runtime_test.go b/pkg/util/runtime/runtime_test.go new file mode 100644 index 0000000000..83a07d04d9 --- /dev/null +++ b/pkg/util/runtime/runtime_test.go @@ -0,0 +1,69 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package runtime + +import ( + "fmt" + "testing" +) + +func TestHandleCrash(t *testing.T) { + count := 0 + expect := 10 + for i := 0; i < expect; i = i + 1 { + defer HandleCrash() + if i%2 == 0 { + panic("Test Panic") + } + count = count + 1 + } + if count != expect { + t.Errorf("Expected %d iterations, found %d", expect, count) + } +} +func TestCustomHandleCrash(t *testing.T) { + old := PanicHandlers + defer func() { PanicHandlers = old }() + var result interface{} + PanicHandlers = []func(interface{}){ + func(r interface{}) { + result = r + }, + } + func() { + defer HandleCrash() + panic("test") + }() + if result != "test" { + t.Errorf("did not receive custom handler") + } +} +func TestCustomHandleError(t *testing.T) { + old := ErrorHandlers + defer func() { ErrorHandlers = old }() + var result error + ErrorHandlers = []func(error){ + func(err error) { + result = err + }, + } + err := fmt.Errorf("test") + HandleError(err) + if result != err { + t.Errorf("did not receive custom handler") + } +} diff --git a/pkg/util/ssh.go b/pkg/util/ssh.go index cdbd67bc3f..64ced17b67 100644 --- a/pkg/util/ssh.go +++ b/pkg/util/ssh.go @@ -34,6 +34,7 @@ import ( "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus" "golang.org/x/crypto/ssh" + "k8s.io/kubernetes/pkg/util/runtime" ) var ( @@ -289,7 +290,7 @@ func (l *SSHTunnelList) Close() { for ix := range l.entries { entry := l.entries[ix] go func() { - defer HandleCrash() + defer runtime.HandleCrash() time.Sleep(1 * time.Minute) if err := entry.Tunnel.Close(); err != nil { glog.Errorf("Failed to close tunnel %v: %v", entry, err) diff --git a/pkg/util/util.go b/pkg/util/util.go index fb887b872e..48cc67c711 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -22,19 +22,14 @@ import ( "os" "reflect" "regexp" - "runtime" "strconv" "strings" "time" "k8s.io/kubernetes/pkg/util/intstr" - - "github.com/golang/glog" + "k8s.io/kubernetes/pkg/util/runtime" ) -// For testing, bypass HandleCrash. -var ReallyCrash bool - // For any test of the style: // ... // <- time.After(timeout): @@ -44,57 +39,6 @@ var ReallyCrash bool // (GC, seeks, etc), but not so long as to make a developer ctrl-c a test run if they do happen to break that test. var ForeverTestTimeout = time.Second * 30 -// PanicHandlers is a list of functions which will be invoked when a panic happens. -var PanicHandlers = []func(interface{}){logPanic} - -// HandleCrash simply catches a crash and logs an error. Meant to be called via defer. -// Additional context-specific handlers can be provided, and will be called in case of panic -func HandleCrash(additionalHandlers ...func(interface{})) { - if ReallyCrash { - return - } - if r := recover(); r != nil { - for _, fn := range PanicHandlers { - fn(r) - } - for _, fn := range additionalHandlers { - fn(r) - } - } -} - -// logPanic logs the caller tree when a panic occurs. -func logPanic(r interface{}) { - callers := "" - for i := 0; true; i++ { - _, file, line, ok := runtime.Caller(i) - if !ok { - break - } - callers = callers + fmt.Sprintf("%v:%v\n", file, line) - } - glog.Errorf("Recovered from panic: %#v (%v)\n%v", r, r, callers) -} - -// ErrorHandlers is a list of functions which will be invoked when an unreturnable -// error occurs. -var ErrorHandlers = []func(error){logError} - -// HandlerError is a method to invoke when a non-user facing piece of code cannot -// return an error and needs to indicate it has been ignored. Invoking this method -// is preferable to logging the error - the default behavior is to log but the -// errors may be sent to a remote server for analysis. -func HandleError(err error) { - for _, fn := range ErrorHandlers { - fn(err) - } -} - -// logError prints an error with the call stack of the location it was reported -func logError(err error) { - glog.ErrorDepth(2, err) -} - // NeverStop may be passed to Until to make it never stop. var NeverStop <-chan struct{} = make(chan struct{}) @@ -116,7 +60,7 @@ func Until(f func(), period time.Duration, stopCh <-chan struct{}) { for { func() { - defer HandleCrash() + defer runtime.HandleCrash() f() }() select { diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index cd4605ca1e..b5ef5b682e 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -17,7 +17,6 @@ limitations under the License. package util import ( - "fmt" "testing" "time" ) @@ -53,55 +52,6 @@ func TestUntilReturnsImmediately(t *testing.T) { } } -func TestHandleCrash(t *testing.T) { - count := 0 - expect := 10 - for i := 0; i < expect; i = i + 1 { - defer HandleCrash() - if i%2 == 0 { - panic("Test Panic") - } - count = count + 1 - } - if count != expect { - t.Errorf("Expected %d iterations, found %d", expect, count) - } -} - -func TestCustomHandleCrash(t *testing.T) { - old := PanicHandlers - defer func() { PanicHandlers = old }() - var result interface{} - PanicHandlers = []func(interface{}){ - func(r interface{}) { - result = r - }, - } - func() { - defer HandleCrash() - panic("test") - }() - if result != "test" { - t.Errorf("did not receive custom handler") - } -} - -func TestCustomHandleError(t *testing.T) { - old := ErrorHandlers - defer func() { ErrorHandlers = old }() - var result error - ErrorHandlers = []func(error){ - func(err error) { - result = err - }, - } - err := fmt.Errorf("test") - HandleError(err) - if result != err { - t.Errorf("did not receive custom handler") - } -} - func TestStringDiff(t *testing.T) { diff := StringDiff("aaabb", "aaacc") expect := "aaa\n\nA: bb\n\nB: cc\n\n" diff --git a/pkg/util/wsstream/conn.go b/pkg/util/wsstream/conn.go index 719cf191a6..3934a5b5b7 100644 --- a/pkg/util/wsstream/conn.go +++ b/pkg/util/wsstream/conn.go @@ -27,7 +27,7 @@ import ( "github.com/golang/glog" "golang.org/x/net/websocket" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/runtime" ) // The Websocket subprotocol "channel.k8s.io" prepends each binary message with a byte indicating @@ -92,7 +92,7 @@ func IsWebSocketRequest(req *http.Request) bool { // ignoreReceives reads from a WebSocket until it is closed, then returns. If timeout is set, the // read and write deadlines are pushed every time a new message is received. func ignoreReceives(ws *websocket.Conn, timeout time.Duration) { - defer util.HandleCrash() + defer runtime.HandleCrash() var data []byte for { resetTimeout(ws, timeout) @@ -163,7 +163,7 @@ func (conn *Conn) SetIdleTimeout(duration time.Duration) { // Open the connection and create channels for reading and writing. func (conn *Conn) Open(w http.ResponseWriter, req *http.Request) ([]io.ReadWriteCloser, error) { go func() { - defer util.HandleCrash() + defer runtime.HandleCrash() defer conn.Close() websocket.Server{Handshake: conn.handshake, Handler: conn.handle}.ServeHTTP(w, req) }() diff --git a/pkg/util/wsstream/stream.go b/pkg/util/wsstream/stream.go index 5b3827f04c..846d6c3a1b 100644 --- a/pkg/util/wsstream/stream.go +++ b/pkg/util/wsstream/stream.go @@ -23,7 +23,7 @@ import ( "time" "golang.org/x/net/websocket" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/runtime" ) // The WebSocket subprotocol "binary.k8s.io" will only send messages to the @@ -71,7 +71,7 @@ func (r *Reader) handshake(config *websocket.Config, req *http.Request) error { // method completes. func (r *Reader) Copy(w http.ResponseWriter, req *http.Request) error { go func() { - defer util.HandleCrash() + defer runtime.HandleCrash() websocket.Server{Handshake: r.handshake, Handler: r.handle}.ServeHTTP(w, req) }() return <-r.err diff --git a/pkg/volume/gce_pd/gce_util.go b/pkg/volume/gce_pd/gce_util.go index fddf731235..6be17336ad 100644 --- a/pkg/volume/gce_pd/gce_util.go +++ b/pkg/volume/gce_pd/gce_util.go @@ -30,6 +30,7 @@ import ( "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/exec" "k8s.io/kubernetes/pkg/util/keymutex" + "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/volume" ) @@ -223,7 +224,7 @@ func verifyDevicePath(devicePaths []string, sdBeforeSet sets.String) (string, er // This function is intended to be called asynchronously as a go routine. func detachDiskAndVerify(c *gcePersistentDiskCleaner) { glog.V(5).Infof("detachDiskAndVerify(...) for pd %q. Will block for pending operations", c.pdName) - defer util.HandleCrash() + defer runtime.HandleCrash() // Block execution until any pending attach/detach operations for this PD have completed attachDetachMutex.LockKey(c.pdName) diff --git a/pkg/watch/iowatcher.go b/pkg/watch/iowatcher.go index 5d9ac54e7c..505e6bfcab 100644 --- a/pkg/watch/iowatcher.go +++ b/pkg/watch/iowatcher.go @@ -22,8 +22,8 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/net" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" ) // Decoder allows StreamWatcher to watch any stream for which a Decoder can be written. @@ -88,7 +88,7 @@ func (sw *StreamWatcher) stopping() bool { func (sw *StreamWatcher) receive() { defer close(sw.result) defer sw.Stop() - defer util.HandleCrash() + defer utilruntime.HandleCrash() for { action, obj, err := sw.source.Decode() if err != nil { diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 328fedd11f..1ecfbbaf15 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -32,7 +32,7 @@ import ( "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/types" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/plugin/pkg/scheduler" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" @@ -343,7 +343,7 @@ func (factory *ConfigFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue // Retry asynchronously. // Note that this is extremely rudimentary and we need a more real error handling path. go func() { - defer util.HandleCrash() + defer runtime.HandleCrash() podID := types.NamespacedName{ Namespace: pod.Namespace, Name: pod.Name, diff --git a/test/e2e/e2e.go b/test/e2e/e2e.go index 98c34ba133..80bcdbebe2 100644 --- a/test/e2e/e2e.go +++ b/test/e2e/e2e.go @@ -37,6 +37,7 @@ import ( "k8s.io/kubernetes/pkg/cloudprovider" gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/runtime" ) const ( @@ -206,7 +207,7 @@ var _ = ginkgo.SynchronizedAfterSuite(func() { // generated in this directory, and cluster logs will also be saved. // This function is called on each Ginkgo node in parallel mode. func RunE2ETests(t *testing.T) { - util.ReallyCrash = true + runtime.ReallyCrash = true util.InitLogs() defer util.FlushLogs()