diff --git a/pkg/proxy/userspace/BUILD b/pkg/proxy/userspace/BUILD index b378147969..87e3da69e9 100644 --- a/pkg/proxy/userspace/BUILD +++ b/pkg/proxy/userspace/BUILD @@ -23,6 +23,7 @@ go_library( "//pkg/proxy:go_default_library", "//pkg/proxy/config:go_default_library", "//pkg/proxy/util:go_default_library", + "//pkg/util/async:go_default_library", "//pkg/util/conntrack:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/slice:go_default_library", @@ -86,6 +87,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", "//vendor/k8s.io/utils/exec/testing:go_default_library", ], diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index 698ae65a9b..8db24f0da9 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -19,6 +19,7 @@ package userspace import ( "fmt" "net" + "reflect" "strconv" "strings" "sync" @@ -35,6 +36,7 @@ import ( "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/proxy" utilproxy "k8s.io/kubernetes/pkg/proxy/util" + "k8s.io/kubernetes/pkg/util/async" "k8s.io/kubernetes/pkg/util/conntrack" "k8s.io/kubernetes/pkg/util/iptables" utilexec "k8s.io/utils/exec" @@ -91,6 +93,19 @@ func logTimeout(err error) bool { // ProxySocketFunc is a function which constructs a ProxySocket from a protocol, ip, and port type ProxySocketFunc func(protocol v1.Protocol, ip net.IP, port int) (ProxySocket, error) +const numBurstSyncs int = 2 + +type serviceChange struct { + current *v1.Service + previous *v1.Service +} + +// Interface for async runner; abstracted for testing +type asyncRunnerInterface interface { + Run() + Loop(<-chan struct{}) +} + // Proxier is a simple proxy for TCP connections between a localhost:lport // and services that provide the actual implementations. type Proxier struct { @@ -98,7 +113,7 @@ type Proxier struct { mu sync.Mutex // protects serviceMap serviceMap map[proxy.ServicePortName]*ServiceInfo syncPeriod time.Duration - minSyncPeriod time.Duration // unused atm, but plumbed through + minSyncPeriod time.Duration udpIdleTimeout time.Duration portMapMutex sync.Mutex portMap map[portMapKey]*portMapValue @@ -115,6 +130,10 @@ type Proxier struct { endpointsSynced int32 servicesSynced int32 initialized int32 + // protects serviceChanges + serviceChangesLock sync.Mutex + serviceChanges map[types.NamespacedName]*serviceChange // map of service changes + syncRunner asyncRunnerInterface // governs calls to syncProxyRules stopChan chan struct{} } @@ -210,12 +229,12 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables if err := iptablesFlush(iptables); err != nil { return nil, fmt.Errorf("failed to flush iptables: %v", err) } - return &Proxier{ - loadBalancer: loadBalancer, - serviceMap: make(map[proxy.ServicePortName]*ServiceInfo), - portMap: make(map[portMapKey]*portMapValue), - syncPeriod: syncPeriod, - // plumbed through if needed, not used atm. + proxier := &Proxier{ + loadBalancer: loadBalancer, + serviceMap: make(map[proxy.ServicePortName]*ServiceInfo), + serviceChanges: make(map[types.NamespacedName]*serviceChange), + portMap: make(map[portMapKey]*portMapValue), + syncPeriod: syncPeriod, minSyncPeriod: minSyncPeriod, udpIdleTimeout: udpIdleTimeout, listenIP: listenIP, @@ -225,7 +244,10 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables makeProxySocket: makeProxySocket, exec: exec, stopChan: make(chan struct{}), - }, nil + } + klog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, numBurstSyncs) + proxier.syncRunner = async.NewBoundedFrequencyRunner("userspace-proxy-sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, numBurstSyncs) + return proxier, nil } // CleanupLeftovers removes all iptables rules and chains created by the Proxier @@ -299,14 +321,13 @@ func CleanupLeftovers(ipt iptables.Interface) (encounteredError bool) { // shutdown closes all service port proxies and returns from the proxy's // sync loop. Used from testcases. func (proxier *Proxier) shutdown() { - defer proxier.cleanupStaleStickySessions() - proxier.mu.Lock() defer proxier.mu.Unlock() for serviceName, info := range proxier.serviceMap { - proxier.stopProxyInternal(serviceName, info) + proxier.stopProxy(serviceName, info) } + proxier.cleanupStaleStickySessions() close(proxier.stopChan) } @@ -314,34 +335,52 @@ func (proxier *Proxier) isInitialized() bool { return atomic.LoadInt32(&proxier.initialized) > 0 } -// Sync is called to immediately synchronize the proxier state to iptables +// Sync is called to synchronize the proxier state to iptables as soon as possible. func (proxier *Proxier) Sync() { + proxier.syncRunner.Run() +} + +func (proxier *Proxier) syncProxyRules() { + start := time.Now() + defer func() { + klog.V(2).Infof("userspace syncProxyRules took %v", time.Since(start)) + }() + + // don't sync rules till we've received services and endpoints + if !proxier.isInitialized() { + klog.V(2).Info("Not syncing userspace proxy until Services and Endpoints have been received from master") + return + } + if err := iptablesInit(proxier.iptables); err != nil { klog.Errorf("Failed to ensure iptables: %v", err) } + + proxier.serviceChangesLock.Lock() + changes := proxier.serviceChanges + proxier.serviceChanges = make(map[types.NamespacedName]*serviceChange) + proxier.serviceChangesLock.Unlock() + + proxier.mu.Lock() + defer proxier.mu.Unlock() + + klog.V(2).Infof("userspace proxy: processing %d service events", len(changes)) + for _, change := range changes { + existingPorts := proxier.mergeService(change.current) + proxier.unmergeService(change.previous, existingPorts) + } + proxier.ensurePortals() proxier.cleanupStaleStickySessions() } // SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return. func (proxier *Proxier) SyncLoop() { - t := time.NewTicker(proxier.syncPeriod) - defer t.Stop() - for { - select { - case <-t.C: - klog.V(6).Infof("Periodic sync") - proxier.Sync() - case <-proxier.stopChan: - return - } - } + proxier.syncRunner.Loop(proxier.stopChan) } // Ensure that portals exist for all services. func (proxier *Proxier) ensurePortals() { - proxier.mu.Lock() - defer proxier.mu.Unlock() // NB: This does not remove rules that should not be present. for name, info := range proxier.serviceMap { err := proxier.openPortal(name, info) @@ -353,22 +392,12 @@ func (proxier *Proxier) ensurePortals() { // clean up any stale sticky session records in the hash map. func (proxier *Proxier) cleanupStaleStickySessions() { - proxier.mu.Lock() - defer proxier.mu.Unlock() for name := range proxier.serviceMap { proxier.loadBalancer.CleanupStaleStickySessions(name) } } -// This assumes proxier.mu is not locked. func (proxier *Proxier) stopProxy(service proxy.ServicePortName, info *ServiceInfo) error { - proxier.mu.Lock() - defer proxier.mu.Unlock() - return proxier.stopProxyInternal(service, info) -} - -// This assumes proxier.mu is locked. -func (proxier *Proxier) stopProxyInternal(service proxy.ServicePortName, info *ServiceInfo) error { delete(proxier.serviceMap, service) info.setAlive(false) err := info.socket.Close() @@ -384,16 +413,18 @@ func (proxier *Proxier) getServiceInfo(service proxy.ServicePortName) (*ServiceI return info, ok } -func (proxier *Proxier) setServiceInfo(service proxy.ServicePortName, info *ServiceInfo) { +// addServiceOnPort lockes the proxy before calling addServiceOnPortInternal. +// Used from testcases. +func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol v1.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) { proxier.mu.Lock() defer proxier.mu.Unlock() - proxier.serviceMap[service] = info + return proxier.addServiceOnPortInternal(service, protocol, proxyPort, timeout) } -// addServiceOnPort starts listening for a new service, returning the ServiceInfo. +// addServiceOnPortInternal starts listening for a new service, returning the ServiceInfo. // Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP // connections, for now. -func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol v1.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) { +func (proxier *Proxier) addServiceOnPortInternal(service proxy.ServicePortName, protocol v1.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) { sock, err := proxier.makeProxySocket(protocol, proxier.listenIP, proxyPort) if err != nil { return nil, err @@ -417,7 +448,7 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol socket: sock, sessionAffinityType: v1.ServiceAffinityNone, // default } - proxier.setServiceInfo(service, si) + proxier.serviceMap[service] = si klog.V(2).Infof("Proxying for service %q on %s port %d", service, protocol, portNum) go func(service proxy.ServicePortName, proxier *Proxier) { @@ -444,7 +475,7 @@ func (proxier *Proxier) mergeService(service *v1.Service) sets.String { servicePort := &service.Spec.Ports[i] serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name} existingPorts.Insert(servicePort.Name) - info, exists := proxier.getServiceInfo(serviceName) + info, exists := proxier.serviceMap[serviceName] // TODO: check health of the socket? What if ProxyLoop exited? if exists && sameConfig(info, service, servicePort) { // Nothing changed. @@ -467,7 +498,7 @@ func (proxier *Proxier) mergeService(service *v1.Service) sets.String { serviceIP := net.ParseIP(service.Spec.ClusterIP) klog.V(1).Infof("Adding new service %q at %s/%s", serviceName, net.JoinHostPort(serviceIP.String(), strconv.Itoa(int(servicePort.Port))), servicePort.Protocol) - info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout) + info, err = proxier.addServiceOnPortInternal(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout) if err != nil { klog.Errorf("Failed to start proxy for %q: %v", serviceName, err) continue @@ -504,10 +535,7 @@ func (proxier *Proxier) unmergeService(service *v1.Service, existingPorts sets.S klog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP) return } - staleUDPServices := sets.NewString() - proxier.mu.Lock() - defer proxier.mu.Unlock() for i := range service.Spec.Ports { servicePort := &service.Spec.Ports[i] if existingPorts.Has(servicePort.Name) { @@ -529,7 +557,7 @@ func (proxier *Proxier) unmergeService(service *v1.Service, existingPorts sets.S if err := proxier.closePortal(serviceName, info); err != nil { klog.Errorf("Failed to close portal for %q: %v", serviceName, err) } - if err := proxier.stopProxyInternal(serviceName, info); err != nil { + if err := proxier.stopProxy(serviceName, info); err != nil { klog.Errorf("Failed to stop service %q: %v", serviceName, err) } proxier.loadBalancer.DeleteService(serviceName) @@ -541,17 +569,50 @@ func (proxier *Proxier) unmergeService(service *v1.Service, existingPorts sets.S } } +func (proxier *Proxier) serviceChange(previous, current *v1.Service, detail string) { + var svcName types.NamespacedName + if current != nil { + svcName = types.NamespacedName{Namespace: current.Namespace, Name: current.Name} + } else { + svcName = types.NamespacedName{Namespace: previous.Namespace, Name: previous.Name} + } + klog.V(4).Infof("userspace proxy: %s for %s", detail, svcName) + + proxier.serviceChangesLock.Lock() + defer proxier.serviceChangesLock.Unlock() + + change, exists := proxier.serviceChanges[svcName] + if !exists { + // change.previous is only set for new changes. We must keep + // the oldest service info (or nil) because correct unmerging + // depends on the next update/del after a merge, not subsequent + // updates. + change = &serviceChange{previous: previous} + proxier.serviceChanges[svcName] = change + } + + // Always use the most current service (or nil) as change.current + change.current = current + + if reflect.DeepEqual(change.previous, change.current) { + // collapsed change had no effect + delete(proxier.serviceChanges, svcName) + } else if proxier.isInitialized() { + // change will have an effect, ask the proxy to sync + proxier.syncRunner.Run() + } +} + func (proxier *Proxier) OnServiceAdd(service *v1.Service) { - _ = proxier.mergeService(service) + proxier.serviceChange(nil, service, "OnServiceAdd") } func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) { - existingPorts := proxier.mergeService(service) - proxier.unmergeService(oldService, existingPorts) + proxier.serviceChange(oldService, service, "OnServiceUpdate") } func (proxier *Proxier) OnServiceDelete(service *v1.Service) { - proxier.unmergeService(service, sets.NewString()) + proxier.serviceChange(service, nil, "OnServiceDelete") } func (proxier *Proxier) OnServiceSynced() { @@ -563,6 +624,11 @@ func (proxier *Proxier) OnServiceSynced() { if atomic.LoadInt32(&proxier.endpointsSynced) > 0 { atomic.StoreInt32(&proxier.initialized, 1) } + + // Must sync from a goroutine to avoid blocking the + // service event handler on startup with large numbers + // of initial objects + go proxier.syncProxyRules() } func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) { @@ -587,6 +653,11 @@ func (proxier *Proxier) OnEndpointsSynced() { if atomic.LoadInt32(&proxier.servicesSynced) > 0 { atomic.StoreInt32(&proxier.initialized, 1) } + + // Must sync from a goroutine to avoid blocking the + // service event handler on startup with large numbers + // of initial objects + go proxier.syncProxyRules() } func sameConfig(info *ServiceInfo, service *v1.Service, port *v1.ServicePort) bool { diff --git a/pkg/proxy/userspace/proxier_test.go b/pkg/proxy/userspace/proxier_test.go index 9488c1bb21..e76400d114 100644 --- a/pkg/proxy/userspace/proxier_test.go +++ b/pkg/proxy/userspace/proxier_test.go @@ -24,6 +24,7 @@ import ( "net/http/httptest" "net/url" "os" + "reflect" "strconv" "sync/atomic" "testing" @@ -33,6 +34,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/proxy" ipttest "k8s.io/kubernetes/pkg/util/iptables/testing" "k8s.io/utils/exec" @@ -86,6 +88,16 @@ func waitForClosedPortUDP(p *Proxier, proxyPort int) error { return fmt.Errorf("port %d still open", proxyPort) } +func waitForServiceInfo(p *Proxier, service proxy.ServicePortName) (*ServiceInfo, bool) { + var svcInfo *ServiceInfo + var exists bool + wait.PollImmediate(50*time.Millisecond, 3*time.Second, func() (bool, error) { + svcInfo, exists = p.getServiceInfo(service) + return exists, nil + }) + return svcInfo, exists +} + // udpEchoServer is a simple echo server in UDP, intended for testing the proxy. type udpEchoServer struct { net.PacketConn @@ -225,6 +237,15 @@ func waitForNumProxyClients(t *testing.T, s *ServiceInfo, want int, timeout time t.Errorf("expected %d ProxyClients live, got %d", want, got) } +func startProxier(p *Proxier, t *testing.T) { + go func() { + p.SyncLoop() + }() + waitForNumProxyLoops(t, p, 0) + p.OnServiceSynced() + p.OnEndpointsSynced() +} + func TestTCPProxy(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} @@ -242,7 +263,7 @@ func TestTCPProxy(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) @@ -270,7 +291,7 @@ func TestUDPProxy(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) @@ -298,7 +319,7 @@ func TestUDPProxyTimeout(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) @@ -338,7 +359,7 @@ func TestMultiPortProxy(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) defer p.shutdown() svcInfoP, err := p.addServiceOnPort(serviceP, "TCP", 0, time.Second) @@ -368,7 +389,7 @@ func TestMultiPortOnServiceAdd(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) defer p.shutdown() p.OnServiceAdd(&v1.Service{ @@ -384,7 +405,7 @@ func TestMultiPortOnServiceAdd(t *testing.T) { }}}, }) waitForNumProxyLoops(t, p, 2) - svcInfo, exists := p.getServiceInfo(serviceP) + svcInfo, exists := waitForServiceInfo(p, serviceP) if !exists { t.Fatalf("can't find serviceInfo for %s", serviceP) } @@ -392,7 +413,7 @@ func TestMultiPortOnServiceAdd(t *testing.T) { t.Errorf("unexpected serviceInfo for %s: %#v", serviceP, svcInfo) } - svcInfo, exists = p.getServiceInfo(serviceQ) + svcInfo, exists = waitForServiceInfo(p, serviceQ) if !exists { t.Fatalf("can't find serviceInfo for %s", serviceQ) } @@ -408,7 +429,9 @@ func TestMultiPortOnServiceAdd(t *testing.T) { // Helper: Stops the proxy for the named service. func stopProxyByName(proxier *Proxier, service proxy.ServicePortName) error { - info, found := proxier.getServiceInfo(service) + proxier.mu.Lock() + defer proxier.mu.Unlock() + info, found := proxier.serviceMap[service] if !found { return fmt.Errorf("unknown service: %s", service) } @@ -432,7 +455,7 @@ func TestTCPProxyStop(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) @@ -477,7 +500,7 @@ func TestUDPProxyStop(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) @@ -501,9 +524,9 @@ func TestUDPProxyStop(t *testing.T) { func TestTCPProxyUpdateDelete(t *testing.T) { lb := NewLoadBalancerRR() - service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} + servicePortName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} lb.OnEndpointsAdd(&v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, + ObjectMeta: metav1.ObjectMeta{Namespace: servicePortName.Namespace, Name: servicePortName.Name}, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}}, @@ -516,29 +539,22 @@ func TestTCPProxyUpdateDelete(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) defer p.shutdown() - svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) - if err != nil { - t.Fatalf("error adding new service: %#v", err) - } - conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort)) - if err != nil { - t.Fatalf("error connecting to proxy: %v", err) - } - conn.Close() - waitForNumProxyLoops(t, p, 1) - - p.OnServiceDelete(&v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: servicePortName.Name, Namespace: servicePortName.Namespace}, Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ Name: "p", - Port: int32(svcInfo.proxyPort), + Port: 9997, Protocol: "TCP", }}}, - }) - if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { + } + + p.OnServiceAdd(service) + waitForNumProxyLoops(t, p, 1) + p.OnServiceDelete(service) + if err := waitForClosedPortTCP(p, int(service.Spec.Ports[0].Port)); err != nil { t.Fatalf(err.Error()) } waitForNumProxyLoops(t, p, 0) @@ -561,7 +577,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) @@ -607,7 +623,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) @@ -644,7 +660,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { Protocol: "TCP", }}}, }) - svcInfo, exists := p.getServiceInfo(service) + svcInfo, exists := waitForServiceInfo(p, service) if !exists { t.Fatalf("can't find serviceInfo for %s", service) } @@ -670,7 +686,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) @@ -707,7 +723,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { Protocol: "UDP", }}}, }) - svcInfo, exists := p.getServiceInfo(service) + svcInfo, exists := waitForServiceInfo(p, service) if !exists { t.Fatalf("can't find serviceInfo") } @@ -732,7 +748,7 @@ func TestTCPProxyUpdatePort(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) @@ -754,7 +770,7 @@ func TestTCPProxyUpdatePort(t *testing.T) { if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } - svcInfo, exists := p.getServiceInfo(service) + svcInfo, exists := waitForServiceInfo(p, service) if !exists { t.Fatalf("can't find serviceInfo") } @@ -781,7 +797,7 @@ func TestUDPProxyUpdatePort(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) @@ -802,7 +818,7 @@ func TestUDPProxyUpdatePort(t *testing.T) { if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } - svcInfo, exists := p.getServiceInfo(service) + svcInfo, exists := waitForServiceInfo(p, service) if !exists { t.Fatalf("can't find serviceInfo") } @@ -827,7 +843,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) @@ -853,7 +869,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) { if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } - svcInfo, exists := p.getServiceInfo(service) + svcInfo, exists := waitForServiceInfo(p, service) if !exists { t.Fatalf("can't find serviceInfo") } @@ -881,7 +897,7 @@ func TestProxyUpdatePortal(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) @@ -909,7 +925,16 @@ func TestProxyUpdatePortal(t *testing.T) { }}}, } p.OnServiceUpdate(svcv0, svcv1) - _, exists := p.getServiceInfo(service) + + // Wait for the service to be removed because it had an empty ClusterIP + var exists bool + for i := 0; i < 50; i++ { + _, exists = p.getServiceInfo(service) + if !exists { + break + } + time.Sleep(50 * time.Millisecond) + } if exists { t.Fatalf("service with empty ClusterIP should not be included in the proxy") } @@ -938,7 +963,7 @@ func TestProxyUpdatePortal(t *testing.T) { } p.OnServiceUpdate(svcv2, svcv3) lb.OnEndpointsAdd(endpoint) - svcInfo, exists = p.getServiceInfo(service) + svcInfo, exists = waitForServiceInfo(p, service) if !exists { t.Fatalf("service with ClusterIP set not found in the proxy") } @@ -946,6 +971,172 @@ func TestProxyUpdatePortal(t *testing.T) { waitForNumProxyLoops(t, p, 1) } +type fakeRunner struct{} + +// assert fakeAsyncRunner is a ProxyProvider +var _ asyncRunnerInterface = &fakeRunner{} + +func (f fakeRunner) Run() { +} + +func (f fakeRunner) Loop(stop <-chan struct{}) { +} + +func TestOnServiceAddChangeMap(t *testing.T) { + fexec := makeFakeExec() + + // Use long minSyncPeriod so we can test that immediate syncs work + p, err := createProxier(NewLoadBalancerRR(), net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Minute, udpIdleTimeoutForTest, newProxySocket) + if err != nil { + t.Fatal(err) + } + + // Fake out sync runner + p.syncRunner = fakeRunner{} + + serviceMeta := metav1.ObjectMeta{Namespace: "testnamespace", Name: "testname"} + service := &v1.Service{ + ObjectMeta: serviceMeta, + Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ + Name: "p", + Port: 99, + Protocol: "TCP", + }}}, + } + + serviceUpdate := &v1.Service{ + ObjectMeta: serviceMeta, + Spec: v1.ServiceSpec{ClusterIP: "1.2.3.5", Ports: []v1.ServicePort{{ + Name: "p", + Port: 100, + Protocol: "TCP", + }}}, + } + + serviceUpdate2 := &v1.Service{ + ObjectMeta: serviceMeta, + Spec: v1.ServiceSpec{ClusterIP: "1.2.3.6", Ports: []v1.ServicePort{{ + Name: "p", + Port: 101, + Protocol: "TCP", + }}}, + } + + type onServiceTest struct { + detail string + changes []serviceChange + expectedChange *serviceChange + } + + tests := []onServiceTest{ + { + detail: "add", + changes: []serviceChange{ + {current: service}, + }, + expectedChange: &serviceChange{ + current: service, + }, + }, + { + detail: "add+update=add", + changes: []serviceChange{ + {current: service}, + { + previous: service, + current: serviceUpdate, + }, + }, + expectedChange: &serviceChange{ + current: serviceUpdate, + }, + }, + { + detail: "add+del=none", + changes: []serviceChange{ + {current: service}, + {previous: service}, + }, + }, + { + detail: "update+update=update", + changes: []serviceChange{ + { + previous: service, + current: serviceUpdate, + }, + { + previous: serviceUpdate, + current: serviceUpdate2, + }, + }, + expectedChange: &serviceChange{ + previous: service, + current: serviceUpdate2, + }, + }, + { + detail: "update+del=del", + changes: []serviceChange{ + { + previous: service, + current: serviceUpdate, + }, + {previous: serviceUpdate}, + }, + // change collapsing always keeps the oldest service + // info since correct unmerging depends on the least + // recent update, not the most current. + expectedChange: &serviceChange{ + previous: service, + }, + }, + { + detail: "del+add=update", + changes: []serviceChange{ + {previous: service}, + {current: serviceUpdate}, + }, + expectedChange: &serviceChange{ + previous: service, + current: serviceUpdate, + }, + }, + } + + for _, test := range tests { + for _, change := range test.changes { + p.serviceChange(change.previous, change.current, test.detail) + } + + if test.expectedChange != nil { + if len(p.serviceChanges) != 1 { + t.Fatalf("[%s] expected 1 service change but found %d", test.detail, len(p.serviceChanges)) + } + expectedService := test.expectedChange.current + if expectedService == nil { + expectedService = test.expectedChange.previous + } + svcName := types.NamespacedName{Namespace: expectedService.Namespace, Name: expectedService.Name} + + change, ok := p.serviceChanges[svcName] + if !ok { + t.Fatalf("[%s] did not find service change for %v", test.detail, svcName) + } + if !reflect.DeepEqual(change.previous, test.expectedChange.previous) { + t.Fatalf("[%s] change previous service and expected previous service don't match\nchange: %+v\nexp: %+v", test.detail, change.previous, test.expectedChange.previous) + } + if !reflect.DeepEqual(change.current, test.expectedChange.current) { + t.Fatalf("[%s] change current service and expected current service don't match\nchange: %+v\nexp: %+v", test.detail, change.current, test.expectedChange.current) + } + } else { + if len(p.serviceChanges) != 0 { + t.Fatalf("[%s] expected no service changes but found %d", test.detail, len(p.serviceChanges)) + } + } + } +} + func makeFakeExec() *fakeexec.FakeExec { fcmd := fakeexec.FakeCmd{ CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{