mirror of https://github.com/k3s-io/k3s.git
243 lines
8.5 KiB
Go
243 lines
8.5 KiB
Go
package util
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/rancher/wrangler/v3/pkg/merr"
|
|
"github.com/rancher/wrangler/v3/pkg/schemes"
|
|
"github.com/sirupsen/logrus"
|
|
authorizationv1 "k8s.io/api/authorization/v1"
|
|
v1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/client-go/dynamic"
|
|
clientset "k8s.io/client-go/kubernetes"
|
|
authorizationv1client "k8s.io/client-go/kubernetes/typed/authorization/v1"
|
|
coregetter "k8s.io/client-go/kubernetes/typed/core/v1"
|
|
"k8s.io/client-go/rest"
|
|
"k8s.io/client-go/tools/record"
|
|
)
|
|
|
|
// This sets a default duration to wait for the apiserver to become ready. This is primarily used to
|
|
// block startup of agent supervisor controllers until the apiserver is ready to serve requests, in the
|
|
// same way that the apiReady channel is used in the server packages, so it can be fairly long. It must
|
|
// be at least long enough for downstream projects like RKE2 to start the apiserver in the background.
|
|
const DefaultAPIServerReadyTimeout = 15 * time.Minute
|
|
|
|
func GetAddresses(endpoint *v1.Endpoints) []string {
|
|
serverAddresses := []string{}
|
|
if endpoint == nil {
|
|
return serverAddresses
|
|
}
|
|
for _, subset := range endpoint.Subsets {
|
|
var port string
|
|
if len(subset.Ports) > 0 {
|
|
port = strconv.Itoa(int(subset.Ports[0].Port))
|
|
}
|
|
if port == "" {
|
|
port = "443"
|
|
}
|
|
for _, address := range subset.Addresses {
|
|
serverAddresses = append(serverAddresses, net.JoinHostPort(address.IP, port))
|
|
}
|
|
}
|
|
return serverAddresses
|
|
}
|
|
|
|
// WaitForAPIServerReady waits for the API server's /readyz endpoint to report "ok" with timeout.
|
|
// This is modified from WaitForAPIServer from the Kubernetes controller-manager app, but checks the
|
|
// readyz endpoint instead of the deprecated healthz endpoint, and supports context.
|
|
func WaitForAPIServerReady(ctx context.Context, kubeconfigPath string, timeout time.Duration) error {
|
|
lastErr := errors.New("API server not polled")
|
|
restConfig, err := GetRESTConfig(kubeconfigPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Probe apiserver readiness with a 15 second timeout
|
|
// https://github.com/kubernetes/kubernetes/blob/v1.24.0/cmd/kubeadm/app/util/staticpod/utils.go#L252
|
|
restConfig.Timeout = time.Second * 15
|
|
|
|
// By default, idle connections to the apiserver are returned to a global pool
|
|
// between requests. Explicitly flag this client's request for closure so that
|
|
// we re-dial through the loadbalancer in case the endpoints have changed.
|
|
restConfig.Wrap(func(rt http.RoundTripper) http.RoundTripper {
|
|
return roundTripFunc(func(req *http.Request) (*http.Response, error) {
|
|
req.Close = true
|
|
return rt.RoundTrip(req)
|
|
})
|
|
})
|
|
|
|
restConfig = dynamic.ConfigFor(restConfig)
|
|
restConfig.GroupVersion = &schema.GroupVersion{}
|
|
restClient, err := rest.RESTClientFor(restConfig)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = wait.PollUntilContextTimeout(ctx, time.Second*2, timeout, true, func(ctx context.Context) (bool, error) {
|
|
// DoRaw returns an error if the response code is < 200 OK or > 206 Partial Content
|
|
if _, err := restClient.Get().AbsPath("/readyz").Param("verbose", "").DoRaw(ctx); err != nil {
|
|
if err.Error() != lastErr.Error() {
|
|
logrus.Infof("Polling for API server readiness: GET /readyz failed: %v", err)
|
|
} else {
|
|
logrus.Debug("Polling for API server readiness: GET /readyz failed: status unchanged")
|
|
}
|
|
lastErr = err
|
|
return false, nil
|
|
}
|
|
|
|
return true, nil
|
|
})
|
|
|
|
if err != nil && !errors.Is(err, context.Canceled) {
|
|
return merr.NewErrors(err, lastErr)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// APIServerReadyChan wraps WaitForAPIServerReady, returning a channel that
|
|
// is closed when the apiserver is ready. If the apiserver does not become
|
|
// ready within the expected duration, a fatal error is raised.
|
|
func APIServerReadyChan(ctx context.Context, kubeConfig string, timeout time.Duration) <-chan struct{} {
|
|
ready := make(chan struct{})
|
|
|
|
go func() {
|
|
defer close(ready)
|
|
if err := WaitForAPIServerReady(ctx, kubeConfig, timeout); err != nil {
|
|
logrus.Fatalf("Failed to wait for API server to become ready: %v", err)
|
|
}
|
|
}()
|
|
|
|
return ready
|
|
}
|
|
|
|
type genericAccessReviewRequest func(context.Context) (*authorizationv1.SubjectAccessReviewStatus, error)
|
|
|
|
// WaitForRBACReady polls an AccessReview request until it returns an allowed response. If the user
|
|
// and group are empty, it uses SelfSubjectAccessReview, otherwise SubjectAccessReview is used. It
|
|
// will return an error if the timeout expires, or nil if the SubjectAccessReviewStatus indicates
|
|
// the access would be allowed.
|
|
func WaitForRBACReady(ctx context.Context, kubeconfigPath string, timeout time.Duration, ra authorizationv1.ResourceAttributes, user string, groups ...string) error {
|
|
var lastErr error
|
|
restConfig, err := GetRESTConfig(kubeconfigPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
authClient, err := authorizationv1client.NewForConfig(restConfig)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var reviewFunc genericAccessReviewRequest
|
|
if len(user) == 0 && len(groups) == 0 {
|
|
reviewFunc = selfSubjectAccessReview(authClient, ra)
|
|
} else {
|
|
reviewFunc = subjectAccessReview(authClient, ra, user, groups)
|
|
}
|
|
|
|
err = wait.PollUntilContextTimeout(ctx, time.Second, timeout, true, func(ctx context.Context) (bool, error) {
|
|
status, rerr := reviewFunc(ctx)
|
|
if rerr != nil {
|
|
lastErr = rerr
|
|
return false, nil
|
|
}
|
|
if status.Allowed {
|
|
return true, nil
|
|
}
|
|
lastErr = errors.New(status.Reason)
|
|
return false, nil
|
|
})
|
|
|
|
if err != nil {
|
|
return merr.NewErrors(err, lastErr)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// CheckRBAC performs a single SelfSubjectAccessReview or SubjectAccessReview, returning a
|
|
// boolean indicating whether or not the requested access would be allowed. This is basically
|
|
// `kubectl auth can-i`.
|
|
func CheckRBAC(ctx context.Context, kubeconfigPath string, ra authorizationv1.ResourceAttributes, user string, groups ...string) (bool, error) {
|
|
restConfig, err := GetRESTConfig(kubeconfigPath)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
authClient, err := authorizationv1client.NewForConfig(restConfig)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
var reviewFunc genericAccessReviewRequest
|
|
if len(user) == 0 && len(groups) == 0 {
|
|
reviewFunc = selfSubjectAccessReview(authClient, ra)
|
|
} else {
|
|
reviewFunc = subjectAccessReview(authClient, ra, user, groups)
|
|
}
|
|
|
|
status, err := reviewFunc(ctx)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
return status.Allowed, nil
|
|
}
|
|
|
|
// selfSubjectAccessReview returns a function that makes SelfSubjectAccessReview requests using the
|
|
// provided client and attributes, returning a status or error.
|
|
func selfSubjectAccessReview(authClient *authorizationv1client.AuthorizationV1Client, ra authorizationv1.ResourceAttributes) genericAccessReviewRequest {
|
|
return func(ctx context.Context) (*authorizationv1.SubjectAccessReviewStatus, error) {
|
|
r, err := authClient.SelfSubjectAccessReviews().Create(ctx, &authorizationv1.SelfSubjectAccessReview{
|
|
Spec: authorizationv1.SelfSubjectAccessReviewSpec{
|
|
ResourceAttributes: &ra,
|
|
},
|
|
}, metav1.CreateOptions{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &r.Status, nil
|
|
}
|
|
}
|
|
|
|
// subjectAccessReview returns a function that makes SubjectAccessReview requests using the
|
|
// provided client, attributes, user, and group, returning a status or error.
|
|
func subjectAccessReview(authClient *authorizationv1client.AuthorizationV1Client, ra authorizationv1.ResourceAttributes, user string, groups []string) genericAccessReviewRequest {
|
|
return func(ctx context.Context) (*authorizationv1.SubjectAccessReviewStatus, error) {
|
|
r, err := authClient.SubjectAccessReviews().Create(ctx, &authorizationv1.SubjectAccessReview{
|
|
Spec: authorizationv1.SubjectAccessReviewSpec{
|
|
ResourceAttributes: &ra,
|
|
User: user,
|
|
Groups: groups,
|
|
},
|
|
}, metav1.CreateOptions{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &r.Status, nil
|
|
}
|
|
}
|
|
|
|
func BuildControllerEventRecorder(k8s clientset.Interface, controllerName, namespace string) record.EventRecorder {
|
|
logrus.Infof("Creating %s event broadcaster", controllerName)
|
|
eventBroadcaster := record.NewBroadcaster()
|
|
eventBroadcaster.StartStructuredLogging(0)
|
|
eventBroadcaster.StartRecordingToSink(&coregetter.EventSinkImpl{Interface: k8s.CoreV1().Events(namespace)})
|
|
nodeName := os.Getenv("NODE_NAME")
|
|
return eventBroadcaster.NewRecorder(schemes.All, v1.EventSource{Component: controllerName, Host: nodeName})
|
|
}
|
|
|
|
type roundTripFunc func(req *http.Request) (*http.Response, error)
|
|
|
|
func (w roundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
|
|
return w(req)
|
|
}
|