mirror of https://github.com/k3s-io/k3s.git
Use NodeSelector when node label is enabled
parent
ac2114eafe
commit
f293e14645
|
@ -21,13 +21,16 @@ import (
|
|||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
v1getter "k8s.io/client-go/kubernetes/typed/apps/v1"
|
||||
coregetter "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
)
|
||||
|
||||
const (
|
||||
image = "rancher/klipper-lb:v0.1.1"
|
||||
svcNameLabel = "svccontroller.k3s.cattle.io/svcname"
|
||||
Ready = condition.Cond("Ready")
|
||||
image = "rancher/klipper-lb:v0.1.1"
|
||||
svcNameLabel = "svccontroller.k3s.cattle.io/svcname"
|
||||
daemonsetNodeLabel = "svccontroller.k3s.cattle.io/enablelb"
|
||||
nodeSelectorLabel = "svccontroller.k3s.cattle.io/nodeselector"
|
||||
Ready = condition.Cond("Ready")
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -44,12 +47,14 @@ func Register(ctx context.Context, kubernetes kubernetes.Interface, enabled, roo
|
|||
nodeCache: clients.Node.Cache(),
|
||||
podCache: clients.Pod.Cache(),
|
||||
processor: objectset.NewProcessor("svccontroller").
|
||||
Client(appClients.Deployment),
|
||||
Client(appClients.DaemonSet),
|
||||
serviceCache: clients.Service.Cache(),
|
||||
services: kubernetes.CoreV1(),
|
||||
daemonsets: kubernetes.AppsV1(),
|
||||
}
|
||||
|
||||
clients.Service.OnChange(ctx, "svccontroller", h.onChange)
|
||||
clients.Service.OnChange(ctx, "svccontroller", h.onChangeService)
|
||||
clients.Node.OnChange(ctx, "svccontroller", h.onChangeNode)
|
||||
changeset.Watch(ctx, "svccontroller-watcher",
|
||||
h.onResourceChange,
|
||||
clients.Service,
|
||||
|
@ -67,6 +72,7 @@ type handler struct {
|
|||
processor *objectset.Processor
|
||||
serviceCache coreclient.ServiceClientCache
|
||||
services coregetter.ServicesGetter
|
||||
daemonsets v1getter.DaemonSetsGetter
|
||||
}
|
||||
|
||||
func (h *handler) onResourceChange(name, namespace string, obj runtime.Object) ([]changeset.Key, error) {
|
||||
|
@ -101,7 +107,7 @@ func (h *handler) onResourceChange(name, namespace string, obj runtime.Object) (
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (h *handler) onChange(svc *core.Service) (runtime.Object, error) {
|
||||
func (h *handler) onChangeService(svc *core.Service) (runtime.Object, error) {
|
||||
if svc.Spec.Type != core.ServiceTypeLoadBalancer || svc.Spec.ClusterIP == "" ||
|
||||
svc.Spec.ClusterIP == "None" {
|
||||
return svc, nil
|
||||
|
@ -116,6 +122,18 @@ func (h *handler) onChange(svc *core.Service) (runtime.Object, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
func (h *handler) onChangeNode(node *core.Node) (runtime.Object, error) {
|
||||
if _, ok := node.Labels[daemonsetNodeLabel]; !ok {
|
||||
return node, nil
|
||||
}
|
||||
|
||||
if err := h.updateDaemonSets(); err != nil {
|
||||
return node, err
|
||||
}
|
||||
|
||||
return node, nil
|
||||
}
|
||||
|
||||
func (h *handler) updateService(svc *core.Service) (runtime.Object, error) {
|
||||
pods, err := h.podCache.List(svc.Namespace, labels.SelectorFromSet(map[string]string{
|
||||
svcNameLabel: svc.Name,
|
||||
|
@ -200,43 +218,27 @@ func (h *handler) podIPs(pods []*core.Pod) ([]string, error) {
|
|||
}
|
||||
|
||||
func (h *handler) deployPod(svc *core.Service) error {
|
||||
|
||||
objs := objectset.NewObjectSet()
|
||||
if !h.enabled {
|
||||
return h.processor.NewDesiredSet(svc, objs).Apply()
|
||||
}
|
||||
|
||||
dep, err := h.newDeployment(svc)
|
||||
ds, err := h.newDaemonSet(svc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if dep != nil {
|
||||
objs.Add(dep)
|
||||
if ds != nil {
|
||||
objs.Add(ds)
|
||||
}
|
||||
|
||||
return h.processor.NewDesiredSet(svc, objs).Apply()
|
||||
}
|
||||
|
||||
func (h *handler) newDeployment(svc *core.Service) (*apps.Deployment, error) {
|
||||
func (h *handler) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
|
||||
name := fmt.Sprintf("svclb-%s", svc.Name)
|
||||
zeroInt := intstr.FromInt(0)
|
||||
oneInt := intstr.FromInt(1)
|
||||
replicas := int32(0)
|
||||
|
||||
nodes, err := h.nodeCache.List("", labels.Everything())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, node := range nodes {
|
||||
if Ready.IsTrue(node) {
|
||||
replicas++
|
||||
}
|
||||
if replicas >= 2 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
dep := &apps.Deployment{
|
||||
ds := &apps.DaemonSet{
|
||||
ObjectMeta: meta.ObjectMeta{
|
||||
Name: name,
|
||||
Namespace: svc.Namespace,
|
||||
|
@ -249,13 +251,15 @@ func (h *handler) newDeployment(svc *core.Service) (*apps.Deployment, error) {
|
|||
Controller: &trueVal,
|
||||
},
|
||||
},
|
||||
Labels: map[string]string{
|
||||
nodeSelectorLabel: "false",
|
||||
},
|
||||
},
|
||||
TypeMeta: meta.TypeMeta{
|
||||
Kind: "Deployment",
|
||||
Kind: "DaemonSet",
|
||||
APIVersion: "apps/v1",
|
||||
},
|
||||
Spec: apps.DeploymentSpec{
|
||||
Replicas: &replicas,
|
||||
Spec: apps.DaemonSetSpec{
|
||||
Selector: &meta.LabelSelector{
|
||||
MatchLabels: map[string]string{
|
||||
"app": name,
|
||||
|
@ -269,10 +273,9 @@ func (h *handler) newDeployment(svc *core.Service) (*apps.Deployment, error) {
|
|||
},
|
||||
},
|
||||
},
|
||||
Strategy: apps.DeploymentStrategy{
|
||||
Type: apps.RollingUpdateDeploymentStrategyType,
|
||||
RollingUpdate: &apps.RollingUpdateDeployment{
|
||||
MaxSurge: &zeroInt,
|
||||
UpdateStrategy: apps.DaemonSetUpdateStrategy{
|
||||
Type: apps.RollingUpdateDaemonSetStrategyType,
|
||||
RollingUpdate: &apps.RollingUpdateDaemonSet{
|
||||
MaxUnavailable: &oneInt,
|
||||
},
|
||||
},
|
||||
|
@ -319,8 +322,44 @@ func (h *handler) newDeployment(svc *core.Service) (*apps.Deployment, error) {
|
|||
},
|
||||
}
|
||||
|
||||
dep.Spec.Template.Spec.Containers = append(dep.Spec.Template.Spec.Containers, container)
|
||||
ds.Spec.Template.Spec.Containers = append(ds.Spec.Template.Spec.Containers, container)
|
||||
}
|
||||
// Add node selector only if label "svccontroller.k3s.cattle.io/enablelb" exists on the nodes
|
||||
selector, err := labels.Parse(daemonsetNodeLabel)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nodesWithLabel, err := h.nodeCache.List("", selector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(nodesWithLabel) > 0 {
|
||||
ds.Spec.Template.Spec.NodeSelector = map[string]string{
|
||||
daemonsetNodeLabel: "true",
|
||||
}
|
||||
ds.Labels[nodeSelectorLabel] = "true"
|
||||
}
|
||||
return ds, nil
|
||||
}
|
||||
|
||||
func (h *handler) updateDaemonSets() error {
|
||||
daemonsets, err := h.daemonsets.DaemonSets("").List(meta.ListOptions{
|
||||
LabelSelector: nodeSelectorLabel + "=false",
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return dep, nil
|
||||
for _, ds := range daemonsets.Items {
|
||||
ds.Spec.Template.Spec.NodeSelector = map[string]string{
|
||||
daemonsetNodeLabel: "true",
|
||||
}
|
||||
ds.Labels[nodeSelectorLabel] = "true"
|
||||
if _, err := h.daemonsets.DaemonSets(ds.Namespace).Update(&ds); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue