Kube-proxy: REJECT LB IPs with no endpoints

We REJECT every other case.  Close this FIXME.

To get this to work in all cases, we have to process service in
filter.INPUT, since LB IPS might be manged as local addresses.
pull/564/head
Tim Hockin 2019-02-18 23:52:24 -08:00
parent 382f5c83c0
commit de25d6cb95
3 changed files with 105 additions and 7 deletions

View File

@ -369,6 +369,7 @@ var iptablesJumpChains = []iptablesJumpChain{
{utiliptables.TableFilter, kubeExternalServicesChain, utiliptables.ChainInput, "kubernetes externally-visible service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
{utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainForward, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
{utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainOutput, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
{utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainInput, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
{utiliptables.TableFilter, kubeForwardChain, utiliptables.ChainForward, "kubernetes forwarding rules", nil},
{utiliptables.TableNAT, kubeServicesChain, utiliptables.ChainOutput, "kubernetes service portals", nil},
{utiliptables.TableNAT, kubeServicesChain, utiliptables.ChainPrerouting, "kubernetes service portals", nil},
@ -847,6 +848,7 @@ func (proxier *Proxier) syncProxyRules() {
}
writeLine(proxier.natRules, append(args, "-j", string(svcChain))...)
} else {
// No endpoints.
writeLine(proxier.filterRules,
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
@ -917,6 +919,7 @@ func (proxier *Proxier) syncProxyRules() {
// This covers cases like GCE load-balancers which get added to the local routing table.
writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", string(svcChain))...)
} else {
// No endpoints.
writeLine(proxier.filterRules,
"-A", string(kubeExternalServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
@ -929,10 +932,10 @@ func (proxier *Proxier) syncProxyRules() {
}
// Capture load-balancer ingress.
if hasEndpoints {
fwChain := svcInfo.serviceFirewallChainName
for _, ingress := range svcInfo.LoadBalancerStatus.Ingress {
if ingress.IP != "" {
fwChain := svcInfo.serviceFirewallChainName
for _, ingress := range svcInfo.LoadBalancerStatus.Ingress {
if ingress.IP != "" {
if hasEndpoints {
// create service firewall chain
if chain, ok := existingNATChains[fwChain]; ok {
writeBytesLine(proxier.natChains, chain)
@ -993,10 +996,19 @@ func (proxier *Proxier) syncProxyRules() {
// If the packet was able to reach the end of firewall chain, then it did not get DNATed.
// It means the packet cannot go thru the firewall, then mark it for DROP
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...)
} else {
// No endpoints.
writeLine(proxier.filterRules,
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
"-m", protocol, "-p", protocol,
"-d", utilproxy.ToCIDR(net.ParseIP(ingress.IP)),
"--dport", strconv.Itoa(svcInfo.Port),
"-j", "REJECT",
)
}
}
}
// FIXME: do we need REJECT rules for load-balancer ingress if !hasEndpoints?
// Capture nodeports. If we had more than 2 rules it might be
// worthwhile to make a new per-service chain for nodeport rules, but
@ -1078,6 +1090,7 @@ func (proxier *Proxier) syncProxyRules() {
writeLine(proxier.natRules, append(args, "-j", string(svcXlbChain))...)
}
} else {
// No endpoints.
writeLine(proxier.filterRules,
"-A", string(kubeExternalServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),

View File

@ -738,6 +738,28 @@ func (j *ServiceTestJig) RunOrFail(namespace string, tweak func(rc *v1.Replicati
return result
}
func (j *ServiceTestJig) Scale(namespace string, replicas int) {
rc := j.Name
scale, err := j.Client.CoreV1().ReplicationControllers(namespace).GetScale(rc, metav1.GetOptions{})
if err != nil {
Failf("Failed to get scale for RC %q: %v", rc, err)
}
scale.Spec.Replicas = int32(replicas)
_, err = j.Client.CoreV1().ReplicationControllers(namespace).UpdateScale(rc, scale)
if err != nil {
Failf("Failed to scale RC %q: %v", rc, err)
}
pods, err := j.waitForPodsCreated(namespace, replicas)
if err != nil {
Failf("Failed waiting for pods: %v", err)
}
if err := j.waitForPodsReady(namespace, pods); err != nil {
Failf("Failed waiting for pods to be running: %v", err)
}
return
}
func (j *ServiceTestJig) waitForPdbReady(namespace string) error {
timeout := 2 * time.Minute
for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) {
@ -911,6 +933,20 @@ func (j *ServiceTestJig) TestNotReachableHTTP(host string, port int, timeout tim
}
}
func (j *ServiceTestJig) TestRejectedHTTP(host string, port int, timeout time.Duration) {
pollfn := func() (bool, error) {
result := PokeHTTP(host, port, "/", nil)
if result.Status == HTTPRefused {
return true, nil
}
return false, nil // caller can retry
}
if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil {
Failf("HTTP service %v:%v not rejected: %v", host, port, err)
}
}
func (j *ServiceTestJig) TestReachableUDP(host string, port int, timeout time.Duration) {
pollfn := func() (bool, error) {
result := PokeUDP(host, port, "echo hello", &UDPPokeParams{
@ -941,6 +977,19 @@ func (j *ServiceTestJig) TestNotReachableUDP(host string, port int, timeout time
}
}
func (j *ServiceTestJig) TestRejectedUDP(host string, port int, timeout time.Duration) {
pollfn := func() (bool, error) {
result := PokeUDP(host, port, "echo hello", &UDPPokeParams{Timeout: 3 * time.Second})
if result.Status == UDPRefused {
return true, nil
}
return false, nil // caller can retry
}
if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil {
Failf("UDP service %v:%v not rejected: %v", host, port, err)
}
}
func (j *ServiceTestJig) GetHTTPContent(host string, port int, timeout time.Duration, url string) bytes.Buffer {
var body bytes.Buffer
if pollErr := wait.PollImmediate(Poll, timeout, func() (bool, error) {

View File

@ -791,11 +791,47 @@ var _ = SIGDescribe("Services", func() {
jig.TestReachableUDP(nodeIP, udpNodePort, framework.KubeProxyLagTimeout)
By("hitting the TCP service's LoadBalancer")
jig.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout) // this may actually recreate the LB
jig.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout)
if loadBalancerSupportsUDP {
By("hitting the UDP service's LoadBalancer")
jig.TestReachableUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout) // this may actually recreate the LB)
jig.TestReachableUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout)
}
By("Scaling the pods to 0")
jig.Scale(ns1, 0)
jig.Scale(ns2, 0)
By("looking for ICMP REJECT on the TCP service's NodePort")
jig.TestRejectedHTTP(nodeIP, tcpNodePort, framework.KubeProxyLagTimeout)
By("looking for ICMP REJECT on the UDP service's NodePort")
jig.TestRejectedUDP(nodeIP, udpNodePort, framework.KubeProxyLagTimeout)
By("looking for ICMP REJECT on the TCP service's LoadBalancer")
jig.TestRejectedHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout)
if loadBalancerSupportsUDP {
By("looking for ICMP REJECT on the UDP service's LoadBalancer")
jig.TestRejectedUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout)
}
By("Scaling the pods to 1")
jig.Scale(ns1, 1)
jig.Scale(ns2, 1)
By("hitting the TCP service's NodePort")
jig.TestReachableHTTP(nodeIP, tcpNodePort, framework.KubeProxyLagTimeout)
By("hitting the UDP service's NodePort")
jig.TestReachableUDP(nodeIP, udpNodePort, framework.KubeProxyLagTimeout)
By("hitting the TCP service's LoadBalancer")
jig.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout)
if loadBalancerSupportsUDP {
By("hitting the UDP service's LoadBalancer")
jig.TestReachableUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout)
}
// Change the services back to ClusterIP.