Update p2p boostrap helpers for Spegel v0.0.30

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
(cherry picked from commit 95700aa6b3)
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
pull/11764/head
Brad Davidson 2025-01-22 01:51:28 +00:00 committed by Brad Davidson
parent fa05e1b4b2
commit dbe3bbac34
3 changed files with 202 additions and 97 deletions

View File

@ -48,6 +48,9 @@ var (
// ClientOption is a callback to mutate the http client prior to use
type ClientOption func(*http.Client)
// RequestOption is a callback to mutate the http request prior to use
type RequestOption func(*http.Request)
// Info contains fields that track parsed parts of a cluster join token
type Info struct {
*kubeadm.BootstrapTokenString
@ -240,7 +243,7 @@ func parseToken(token string) (*Info, error) {
// If the CA bundle is not empty but does not contain any valid certs, it validates using
// an empty CA bundle (which will always fail).
// If valid cert+key paths can be loaded from the provided paths, they are used for client cert auth.
func GetHTTPClient(cacerts []byte, certFile, keyFile string, option ...ClientOption) *http.Client {
func GetHTTPClient(cacerts []byte, certFile, keyFile string, options ...any) *http.Client {
if len(cacerts) == 0 {
return defaultClient
}
@ -265,8 +268,10 @@ func GetHTTPClient(cacerts []byte, certFile, keyFile string, option ...ClientOpt
},
}
for _, o := range option {
o(client)
for _, o := range options {
if clientOption, ok := o.(ClientOption); ok {
clientOption(client)
}
}
return client
}
@ -278,8 +283,14 @@ func WithTimeout(d time.Duration) ClientOption {
}
}
func WithHeader(k, v string) RequestOption {
return func(r *http.Request) {
r.Header.Add(k, v)
}
}
// Get makes a request to a subpath of info's BaseURL
func (i *Info) Get(path string, option ...ClientOption) ([]byte, error) {
func (i *Info) Get(path string, options ...any) ([]byte, error) {
u, err := url.Parse(i.BaseURL)
if err != nil {
return nil, err
@ -290,11 +301,12 @@ func (i *Info) Get(path string, option ...ClientOption) ([]byte, error) {
}
p.Scheme = u.Scheme
p.Host = u.Host
return get(p.String(), GetHTTPClient(i.CACerts, i.CertFile, i.KeyFile, option...), i.Username, i.Password, i.Token())
client := GetHTTPClient(i.CACerts, i.CertFile, i.KeyFile, options...)
return get(p.String(), client, i.Username, i.Password, i.Token(), options...)
}
// Put makes a request to a subpath of info's BaseURL
func (i *Info) Put(path string, body []byte, option ...ClientOption) error {
func (i *Info) Put(path string, body []byte, options ...any) error {
u, err := url.Parse(i.BaseURL)
if err != nil {
return err
@ -305,11 +317,12 @@ func (i *Info) Put(path string, body []byte, option ...ClientOption) error {
}
p.Scheme = u.Scheme
p.Host = u.Host
return put(p.String(), body, GetHTTPClient(i.CACerts, i.CertFile, i.KeyFile, option...), i.Username, i.Password, i.Token())
client := GetHTTPClient(i.CACerts, i.CertFile, i.KeyFile, options...)
return put(p.String(), body, client, i.Username, i.Password, i.Token(), options...)
}
// Post makes a request to a subpath of info's BaseURL
func (i *Info) Post(path string, body []byte, option ...ClientOption) ([]byte, error) {
func (i *Info) Post(path string, body []byte, options ...any) ([]byte, error) {
u, err := url.Parse(i.BaseURL)
if err != nil {
return nil, err
@ -320,7 +333,8 @@ func (i *Info) Post(path string, body []byte, option ...ClientOption) ([]byte, e
}
p.Scheme = u.Scheme
p.Host = u.Host
return post(p.String(), body, GetHTTPClient(i.CACerts, i.CertFile, i.KeyFile, option...), i.Username, i.Password, i.Token())
client := GetHTTPClient(i.CACerts, i.CertFile, i.KeyFile, options...)
return post(p.String(), body, client, i.Username, i.Password, i.Token(), options...)
}
// setServer sets the BaseURL and CACerts fields of the Info by connecting to the server
@ -402,7 +416,7 @@ func getCACerts(u url.URL) ([]byte, error) {
// get makes a request to a url using a provided client and credentials,
// returning the response body.
func get(u string, client *http.Client, username, password, token string) ([]byte, error) {
func get(u string, client *http.Client, username, password, token string, options ...any) ([]byte, error) {
req, err := http.NewRequest(http.MethodGet, u, nil)
if err != nil {
return nil, err
@ -414,6 +428,12 @@ func get(u string, client *http.Client, username, password, token string) ([]byt
req.SetBasicAuth(username, password)
}
for _, o := range options {
if requestOption, ok := o.(RequestOption); ok {
requestOption(req)
}
}
resp, err := client.Do(req)
if err != nil {
return nil, err
@ -424,7 +444,7 @@ func get(u string, client *http.Client, username, password, token string) ([]byt
// put makes a request to a url using a provided client and credentials,
// only an error is returned
func put(u string, body []byte, client *http.Client, username, password, token string) error {
func put(u string, body []byte, client *http.Client, username, password, token string, options ...any) error {
req, err := http.NewRequest(http.MethodPut, u, bytes.NewBuffer(body))
if err != nil {
return err
@ -436,6 +456,12 @@ func put(u string, body []byte, client *http.Client, username, password, token s
req.SetBasicAuth(username, password)
}
for _, o := range options {
if requestOption, ok := o.(RequestOption); ok {
requestOption(req)
}
}
resp, err := client.Do(req)
if err != nil {
return err
@ -447,7 +473,7 @@ func put(u string, body []byte, client *http.Client, username, password, token s
// post makes a request to a url using a provided client and credentials,
// returning the response body and error.
func post(u string, body []byte, client *http.Client, username, password, token string) ([]byte, error) {
func post(u string, body []byte, client *http.Client, username, password, token string, options ...any) ([]byte, error) {
req, err := http.NewRequest(http.MethodPost, u, bytes.NewBuffer(body))
if err != nil {
return nil, err
@ -459,6 +485,12 @@ func post(u string, body []byte, client *http.Client, username, password, token
req.SetBasicAuth(username, password)
}
for _, o := range options {
if requestOption, ok := o.(RequestOption); ok {
requestOption(req)
}
}
resp, err := client.Do(req)
if err != nil {
return nil, err

View File

@ -2,7 +2,7 @@ package spegel
import (
"context"
"math/rand"
"encoding/json"
"os"
"path/filepath"
"strings"
@ -17,6 +17,7 @@ import (
"github.com/rancher/wrangler/v3/pkg/merr"
"github.com/sirupsen/logrus"
"github.com/spegel-org/spegel/pkg/routing"
"golang.org/x/sync/errgroup"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
@ -39,13 +40,20 @@ func NewSelfBootstrapper() routing.Bootstrapper {
return &selfBootstrapper{}
}
func (s *selfBootstrapper) Run(_ context.Context, id string) error {
func (s *selfBootstrapper) Run(ctx context.Context, id string) error {
s.id = id
return nil
return waitForDone(ctx)
}
func (s *selfBootstrapper) Get() (*peer.AddrInfo, error) {
return peer.AddrInfoFromString(s.id)
func (s *selfBootstrapper) Get(ctx context.Context) ([]peer.AddrInfo, error) {
if s.id == "" {
return nil, errors.New("p2p peer not ready")
}
self, err := peer.AddrInfoFromString(s.id)
if err != nil {
return nil, err
}
return []peer.AddrInfo{*self}, nil
}
type agentBootstrapper struct {
@ -53,6 +61,8 @@ type agentBootstrapper struct {
token string
clientCert string
clientKey string
kubeConfig string
info *clientaccess.Info
}
// NewAgentBootstrapper returns a p2p bootstrapper that retrieves a peer address from its server
@ -60,79 +70,104 @@ func NewAgentBootstrapper(server, token, dataDir string) routing.Bootstrapper {
return &agentBootstrapper{
clientCert: filepath.Join(dataDir, "agent", "client-kubelet.crt"),
clientKey: filepath.Join(dataDir, "agent", "client-kubelet.key"),
kubeConfig: filepath.Join(dataDir, "agent", "kubelet.kubeconfig"),
server: server,
token: token,
}
}
func (c *agentBootstrapper) Run(_ context.Context, _ string) error {
return nil
func (c *agentBootstrapper) Run(ctx context.Context, id string) error {
if c.server != "" && c.token != "" {
withCert := clientaccess.WithClientCertificate(c.clientCert, c.clientKey)
info, err := clientaccess.ParseAndValidateToken(c.server, c.token, withCert)
if err != nil {
return errors.Wrap(err, "failed to validate join token")
}
c.info = info
}
client, err := util.GetClientSet(c.kubeConfig)
if err != nil {
return errors.Wrap(err, "failed to create kubernetes client")
}
nodes := client.CoreV1().Nodes()
go wait.PollUntilContextCancel(ctx, 1*time.Second, true, func(ctx context.Context) (bool, error) {
nodeName := os.Getenv("NODE_NAME")
if nodeName == "" {
return false, nil
}
node, err := nodes.Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
logrus.Debugf("Failed to update P2P address annotations and labels: %v", err)
return false, nil
}
if node.Annotations == nil {
node.Annotations = map[string]string{}
}
node.Annotations[P2pAddressAnnotation] = id
if node.Labels == nil {
node.Labels = map[string]string{}
}
node.Labels[P2pEnabledLabel] = "true"
if _, err = nodes.Update(ctx, node, metav1.UpdateOptions{}); err != nil {
logrus.Debugf("Failed to update P2P address annotations and labels: %v", err)
return false, nil
}
logrus.Infof("Node P2P address annotations and labels added: %s", id)
return true, nil
})
return waitForDone(ctx)
}
func (c *agentBootstrapper) Get() (*peer.AddrInfo, error) {
func (c *agentBootstrapper) Get(ctx context.Context) ([]peer.AddrInfo, error) {
if c.server == "" || c.token == "" {
return nil, errors.New("cannot get addresses without server and token")
}
withCert := clientaccess.WithClientCertificate(c.clientCert, c.clientKey)
info, err := clientaccess.ParseAndValidateToken(c.server, c.token, withCert)
if c.info == nil {
return nil, errors.New("client not ready")
}
addr, err := c.info.Get("/v1-"+version.Program+"/p2p", clientaccess.WithHeader("Accept", "application/json"))
if err != nil {
return nil, err
}
addr, err := info.Get("/v1-" + version.Program + "/p2p")
if err != nil {
return nil, err
// If the response cannot be decoded as a JSON list of addresses, fall back
// to using it as a legacy single-address response.
addrs := []string{}
if err := json.Unmarshal(addr, &addrs); err != nil {
addrs = append(addrs, string(addr))
}
addrInfo, err := peer.AddrInfoFromString(string(addr))
return addrInfo, err
addrInfos := []peer.AddrInfo{}
for _, addr := range addrs {
if addrInfo, err := peer.AddrInfoFromString(addr); err == nil {
addrInfos = append(addrInfos, *addrInfo)
}
}
return addrInfos, nil
}
type serverBootstrapper struct {
controlConfig *config.Control
}
// NewServerBootstrapper returns a p2p bootstrapper that returns an address from a random other cluster member.
// NewServerBootstrapper returns a p2p bootstrapper that returns an address from the Kubernetes node list
func NewServerBootstrapper(controlConfig *config.Control) routing.Bootstrapper {
return &serverBootstrapper{
controlConfig: controlConfig,
}
}
func (s *serverBootstrapper) Run(_ context.Context, id string) error {
s.controlConfig.Runtime.ClusterControllerStarts["spegel-p2p"] = func(ctx context.Context) {
nodes := s.controlConfig.Runtime.Core.Core().V1().Node()
_ = wait.PollUntilContextCancel(ctx, 1*time.Second, true, func(ctx context.Context) (bool, error) {
nodeName := os.Getenv("NODE_NAME")
if nodeName == "" {
return false, nil
}
node, err := nodes.Get(nodeName, metav1.GetOptions{})
if err != nil {
return false, nil
}
if node.Annotations == nil {
node.Annotations = map[string]string{}
}
node.Annotations[P2pAddressAnnotation] = id
if node.Labels == nil {
node.Labels = map[string]string{}
}
node.Labels[P2pEnabledLabel] = "true"
if _, err = nodes.Update(node); err != nil {
return false, nil
}
logrus.Infof("Node P2P address annotations and labels added: %s", id)
return true, nil
})
}
return nil
func (s *serverBootstrapper) Run(ctx context.Context, _ string) error {
return waitForDone(ctx)
}
func (s *serverBootstrapper) Get() (addrInfo *peer.AddrInfo, err error) {
func (s *serverBootstrapper) Get(ctx context.Context) ([]peer.AddrInfo, error) {
if s.controlConfig.Runtime.Core == nil {
return nil, util.ErrCoreNotReady
}
@ -146,8 +181,9 @@ func (s *serverBootstrapper) Get() (addrInfo *peer.AddrInfo, err error) {
if err != nil {
return nil, err
}
for _, i := range rand.Perm(len(nodeList.Items)) {
node := nodeList.Items[i]
addrs := []peer.AddrInfo{}
for _, node := range nodeList.Items {
if node.Name == nodeName {
// don't return our own address
continue
@ -159,12 +195,12 @@ func (s *serverBootstrapper) Get() (addrInfo *peer.AddrInfo, err error) {
if val, ok := node.Annotations[P2pAddressAnnotation]; ok {
for _, addr := range strings.Split(val, ",") {
if info, err := peer.AddrInfoFromString(addr); err == nil {
return info, nil
addrs = append(addrs, *info)
}
}
}
}
return nil, errors.New("no ready p2p peers found")
return addrs, nil
}
type chainingBootstrapper struct {
@ -172,6 +208,7 @@ type chainingBootstrapper struct {
}
// NewChainingBootstrapper returns a p2p bootstrapper that passes through to a list of bootstrappers.
// Addressess are returned from all boostrappers that return successfully.
func NewChainingBootstrapper(bootstrappers ...routing.Bootstrapper) routing.Bootstrapper {
return &chainingBootstrapper{
bootstrappers: bootstrappers,
@ -179,23 +216,38 @@ func NewChainingBootstrapper(bootstrappers ...routing.Bootstrapper) routing.Boot
}
func (c *chainingBootstrapper) Run(ctx context.Context, id string) error {
errs := merr.Errors{}
for _, b := range c.bootstrappers {
if err := b.Run(ctx, id); err != nil {
errs = append(errs, err)
}
eg, ctx := errgroup.WithContext(ctx)
for i := range c.bootstrappers {
b := c.bootstrappers[i]
eg.Go(func() error {
return b.Run(ctx, id)
})
}
return merr.NewErrors(errs...)
return eg.Wait()
}
func (c *chainingBootstrapper) Get() (*peer.AddrInfo, error) {
func (c *chainingBootstrapper) Get(ctx context.Context) ([]peer.AddrInfo, error) {
errs := merr.Errors{}
for _, b := range c.bootstrappers {
addr, err := b.Get()
if err == nil {
return addr, nil
addrs := []peer.AddrInfo{}
for i := range c.bootstrappers {
b := c.bootstrappers[i]
as, err := b.Get(ctx)
if err != nil {
errs = append(errs, err)
} else {
addrs = append(addrs, as...)
}
errs = append(errs, err)
}
return nil, merr.NewErrors(errs...)
if len(addrs) == 0 {
return nil, merr.NewErrors(errs...)
}
return addrs, nil
}
func waitForDone(ctx context.Context) error {
<-ctx.Done()
if err := ctx.Err(); err != nil && !errors.Is(err, context.Canceled) {
return err
}
return nil
}

View File

@ -2,6 +2,7 @@ package spegel
import (
"context"
"encoding/json"
"fmt"
"log"
"net"
@ -131,7 +132,7 @@ func (c *Config) Start(ctx context.Context, nodeConfig *config.Node) error {
if logrus.IsLevelEnabled(logrus.DebugLevel) {
level = ipfslog.LevelDebug
stdlog := log.New(logrus.StandardLogger().Writer(), "spegel ", log.LstdFlags)
logger := stdr.NewWithOptions(stdlog, stdr.Options{Verbosity: ptr.To(10)})
logger := stdr.NewWithOptions(stdlog, stdr.Options{Verbosity: ptr.To(7)})
ctx = logr.NewContext(ctx, logger)
}
ipfslog.SetAllLoggers(level)
@ -197,7 +198,7 @@ func (c *Config) Start(ctx context.Context, nodeConfig *config.Node) error {
}
router, err := routing.NewP2PRouter(ctx, routerAddr, c.Bootstrapper, c.RegistryPort, opts...)
if err != nil {
return errors.Wrap(err, "failed to create p2p router")
return errors.Wrap(err, "failed to create P2P router")
}
go router.Run(ctx)
@ -216,13 +217,10 @@ func (c *Config) Start(ctx context.Context, nodeConfig *config.Node) error {
registry.WithLogger(logr.FromContextOrDiscard(ctx)),
}
reg := registry.NewRegistry(ociClient, router, registryOpts...)
regSvr := reg.Server(":" + c.RegistryPort)
// Close router on shutdown
go func() {
<-ctx.Done()
router.Close()
}()
regSvr, err := reg.Server(":" + c.RegistryPort)
if err != nil {
return errors.Wrap(err, "failed to create embedded registry server")
}
// Track images available in containerd and publish via p2p router
go state.Track(ctx, ociClient, router, resolveLatestTag)
@ -232,29 +230,52 @@ func (c *Config) Start(ctx context.Context, nodeConfig *config.Node) error {
return err
}
mRouter.PathPrefix("/v2").Handler(regSvr.Handler)
mRouter.PathPrefix("/v1-" + version.Program + "/p2p").Handler(c.peerInfo())
mRouter.PathPrefix("/v1-{program}/p2p").Handler(c.peerInfo())
// Wait up to 5 seconds for the p2p network to find peers. This will return
// immediately if the node is bootstrapping from itself.
_ = wait.PollUntilContextTimeout(ctx, time.Second, resolveTimeout, true, func(_ context.Context) (bool, error) {
return router.Ready()
})
if err := wait.PollUntilContextTimeout(ctx, time.Second, resolveTimeout, true, func(_ context.Context) (bool, error) {
ready, _ := router.Ready(ctx)
return ready, nil
}); err != nil {
logrus.Warnf("Failed to wait for P2P mesh to become ready, will retry in the background: %v", err)
}
return nil
}
// peerInfo sends a peer address retrieved from the bootstrapper via HTTP
func (c *Config) peerInfo() http.HandlerFunc {
return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
client, _, _ := net.SplitHostPort(req.RemoteAddr)
info, err := c.Bootstrapper.Get()
if err != nil {
http.Error(resp, "Internal Error", http.StatusInternalServerError)
info, err := c.Bootstrapper.Get(req.Context())
if err != nil || len(info) == 0 {
http.Error(resp, err.Error(), http.StatusInternalServerError)
return
}
logrus.Debugf("Serving p2p peer addr %s to client at %s", info, client)
resp.WriteHeader(http.StatusOK)
addrs := []string{}
for _, ai := range info {
for _, ma := range ai.Addrs {
addrs = append(addrs, fmt.Sprintf("%s/p2p/%s", ma, ai.ID))
}
}
client, _, _ := net.SplitHostPort(req.RemoteAddr)
if req.Header.Get("Accept") == "application/json" {
b, err := json.Marshal(addrs)
if err != nil {
http.Error(resp, err.Error(), http.StatusInternalServerError)
return
}
logrus.Debugf("Serving p2p peer addrs %v to client at %s", addrs, client)
resp.Header().Set("Content-Type", "application/json")
resp.WriteHeader(http.StatusOK)
resp.Write(b)
return
}
logrus.Debugf("Serving p2p peer addr %v to client at %s", addrs[0], client)
resp.Header().Set("Content-Type", "text/plain")
fmt.Fprintf(resp, "%s/p2p/%s", info.Addrs[0].String(), info.ID.String())
resp.WriteHeader(http.StatusOK)
resp.Write([]byte(addrs[0]))
})
}