From 0777ecd030a296ba93188e2e7341af617e7ff21c Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Fri, 2 Dec 2016 10:54:12 -0800 Subject: [PATCH] Fix race in service IP allocation repair loop --- hack/update-openapi-spec.sh | 2 + .../core/service/ipallocator/allocator.go | 23 ++++++ .../service/ipallocator/allocator_test.go | 51 +++++++++++++ .../service/ipallocator/controller/repair.go | 76 +++++++++++++++---- .../ipallocator/controller/repair_test.go | 37 ++++++--- 5 files changed, 164 insertions(+), 25 deletions(-) diff --git a/hack/update-openapi-spec.sh b/hack/update-openapi-spec.sh index 0723f40e4a..b0016bc29b 100755 --- a/hack/update-openapi-spec.sh +++ b/hack/update-openapi-spec.sh @@ -65,6 +65,8 @@ kube::log::status "Starting kube-apiserver" --cert-dir="${TMP_DIR}/certs" \ --runtime-config="api/all=true" \ --token-auth-file=$TMP_DIR/tokenauth.csv \ + --logtostderr \ + --v=2 \ --service-cluster-ip-range="10.0.0.0/24" >/tmp/openapi-api-server.log 2>&1 & APISERVER_PID=$! diff --git a/pkg/registry/core/service/ipallocator/allocator.go b/pkg/registry/core/service/ipallocator/allocator.go index 2ef4c6edda..7adb30101b 100644 --- a/pkg/registry/core/service/ipallocator/allocator.go +++ b/pkg/registry/core/service/ipallocator/allocator.go @@ -90,6 +90,19 @@ func NewCIDRRange(cidr *net.IPNet) *Range { }) } +// NewFromSnapshot allocates a Range and initializes it from a snapshot. +func NewFromSnapshot(snap *api.RangeAllocation) (*Range, error) { + _, ipnet, err := net.ParseCIDR(snap.Range) + if err != nil { + return nil, err + } + r := NewCIDRRange(ipnet) + if err := r.Restore(ipnet, snap.Data); err != nil { + return nil, err + } + return r, nil +} + func maximum(a, b int) int { if a > b { return a @@ -102,6 +115,16 @@ func (r *Range) Free() int { return r.alloc.Free() } +// Used returns the count of IP addresses used in the range. +func (r *Range) Used() int { + return r.max - r.alloc.Free() +} + +// CIDR returns the CIDR covered by the range. +func (r *Range) CIDR() net.IPNet { + return *r.net +} + // Allocate attempts to reserve the provided IP. ErrNotInRange or // ErrAllocated will be returned if the IP is not valid for this range // or has already been reserved. ErrFull will be returned if there diff --git a/pkg/registry/core/service/ipallocator/allocator_test.go b/pkg/registry/core/service/ipallocator/allocator_test.go index f87707cd77..37f72f79df 100644 --- a/pkg/registry/core/service/ipallocator/allocator_test.go +++ b/pkg/registry/core/service/ipallocator/allocator_test.go @@ -34,6 +34,9 @@ func TestAllocate(t *testing.T) { if f := r.Free(); f != 254 { t.Errorf("unexpected free %d", f) } + if f := r.Used(); f != 0 { + t.Errorf("unexpected used %d", f) + } found := sets.NewString() count := 0 for r.Free() > 0 { @@ -61,6 +64,9 @@ func TestAllocate(t *testing.T) { if f := r.Free(); f != 1 { t.Errorf("unexpected free %d", f) } + if f := r.Used(); f != 253 { + t.Errorf("unexpected free %d", f) + } ip, err := r.AllocateNext() if err != nil { t.Fatal(err) @@ -87,12 +93,18 @@ func TestAllocate(t *testing.T) { if f := r.Free(); f != 1 { t.Errorf("unexpected free %d", f) } + if f := r.Used(); f != 253 { + t.Errorf("unexpected free %d", f) + } if err := r.Allocate(released); err != nil { t.Fatal(err) } if f := r.Free(); f != 0 { t.Errorf("unexpected free %d", f) } + if f := r.Used(); f != 254 { + t.Errorf("unexpected free %d", f) + } } func TestAllocateTiny(t *testing.T) { @@ -256,3 +268,42 @@ func TestSnapshot(t *testing.T) { t.Errorf("counts do not match: %d", other.Free()) } } + +func TestNewFromSnapshot(t *testing.T) { + _, cidr, err := net.ParseCIDR("192.168.0.0/24") + if err != nil { + t.Fatal(err) + } + r := NewCIDRRange(cidr) + allocated := []net.IP{} + for i := 0; i < 128; i++ { + ip, err := r.AllocateNext() + if err != nil { + t.Fatal(err) + } + allocated = append(allocated, ip) + } + + snapshot := api.RangeAllocation{} + if err = r.Snapshot(&snapshot); err != nil { + t.Fatal(err) + } + + r, err = NewFromSnapshot(&snapshot) + if err != nil { + t.Fatal(err) + } + + if x := r.Free(); x != 126 { + t.Fatalf("expected 126 free IPs, got %d", x) + } + if x := r.Used(); x != 128 { + t.Fatalf("expected 128 used IPs, got %d", x) + } + + for _, ip := range allocated { + if !r.Has(ip) { + t.Fatalf("expected IP to be allocated, but it was not") + } + } +} diff --git a/pkg/registry/core/service/ipallocator/controller/repair.go b/pkg/registry/core/service/ipallocator/controller/repair.go index 4360689981..6095a91c83 100644 --- a/pkg/registry/core/service/ipallocator/controller/repair.go +++ b/pkg/registry/core/service/ipallocator/controller/repair.go @@ -51,8 +51,13 @@ type Repair struct { serviceClient coreclient.ServicesGetter network *net.IPNet alloc rangeallocation.RangeRegistry + leaks map[string]int // counter per leaked IP } +// How many times we need to detect a leak before we clean up. This is to +// avoid races between allocating an IP and using it. +const numRepairsBeforeLeakCleanup = 3 + // NewRepair creates a controller that periodically ensures that all clusterIPs are uniquely allocated across the cluster // and generates informational warnings for a cluster that is not in sync. func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, network *net.IPNet, alloc rangeallocation.RangeRegistry) *Repair { @@ -61,6 +66,7 @@ func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, serviceClient: serviceClient, network: network, alloc: alloc, + leaks: map[string]int{}, } } @@ -89,18 +95,27 @@ func (c *Repair) runOnce() error { // If etcd server is not running we should wait for some time and fail only then. This is particularly // important when we start apiserver and etcd at the same time. - var latest *api.RangeAllocation - var err error - err = wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) { - latest, err = c.alloc.Get() + var snapshot *api.RangeAllocation + err := wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) { + var err error + snapshot, err = c.alloc.Get() return err == nil, err }) if err != nil { return fmt.Errorf("unable to refresh the service IP block: %v", err) } + // If not yet initialized. + if snapshot.Range == "" { + snapshot.Range = c.network.String() + } + // Create an allocator because it is easy to use. + stored, err := ipallocator.NewFromSnapshot(snapshot) + if err != nil { + return fmt.Errorf("unable to rebuild allocator from snapshot: %v", err) + } // We explicitly send no resource version, since the resource version - // of 'latest' is from a different collection, it's not comparable to + // of 'snapshot' is from a different collection, it's not comparable to // the service collection. The caching layer keeps per-collection RVs, // and this is proper, since in theory the collections could be hosted // in separate etcd (or even non-etcd) instances. @@ -109,40 +124,73 @@ func (c *Repair) runOnce() error { return fmt.Errorf("unable to refresh the service IP block: %v", err) } - r := ipallocator.NewCIDRRange(c.network) + rebuilt := ipallocator.NewCIDRRange(c.network) + // Check every Service's ClusterIP, and rebuild the state as we think it should be. for _, svc := range list.Items { if !api.IsServiceIPSet(&svc) { + // didn't need a cluster IP continue } ip := net.ParseIP(svc.Spec.ClusterIP) if ip == nil { - // cluster IP is broken, reallocate + // cluster IP is corrupt runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not a valid IP; please recreate", svc.Spec.ClusterIP, svc.Name, svc.Namespace)) continue } - switch err := r.Allocate(ip); err { + // mark it as in-use + switch err := rebuilt.Allocate(ip); err { case nil: + if stored.Has(ip) { + // remove it from the old set, so we can find leaks + stored.Release(ip) + } else { + // cluster IP doesn't seem to be allocated + runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not allocated; repairing", svc.Spec.ClusterIP, svc.Name, svc.Namespace)) + } + delete(c.leaks, ip.String()) // it is used, so it can't be leaked case ipallocator.ErrAllocated: // TODO: send event - // cluster IP is broken, reallocate + // cluster IP is duplicate runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s was assigned to multiple services; please recreate", ip, svc.Name, svc.Namespace)) case ipallocator.ErrNotInRange: // TODO: send event - // cluster IP is broken, reallocate + // cluster IP is out of range runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not within the service CIDR %s; please recreate", ip, svc.Name, svc.Namespace, c.network)) case ipallocator.ErrFull: // TODO: send event - return fmt.Errorf("the service CIDR %v is full; you must widen the CIDR in order to create new services", r) + // somehow we are out of IPs + return fmt.Errorf("the service CIDR %v is full; you must widen the CIDR in order to create new services", rebuilt) default: return fmt.Errorf("unable to allocate cluster IP %s for service %s/%s due to an unknown error, exiting: %v", ip, svc.Name, svc.Namespace, err) } } - if err := r.Snapshot(latest); err != nil { + // Check for IPs that are left in the old set. They appear to have been leaked. + stored.ForEach(func(ip net.IP) { + count, found := c.leaks[ip.String()] + switch { + case !found: + // flag it to be cleaned up after any races (hopefully) are gone + runtime.HandleError(fmt.Errorf("the cluster IP %s may have leaked: flagging for later clean up", ip)) + count = numRepairsBeforeLeakCleanup - 1 + fallthrough + case count > 0: + // pretend it is still in use until count expires + c.leaks[ip.String()] = count - 1 + if err := rebuilt.Allocate(ip); err != nil { + runtime.HandleError(fmt.Errorf("the cluster IP %s may have leaked, but can not be allocated: %v", ip, err)) + } + default: + // do not add it to the rebuilt set, which means it will be available for reuse + runtime.HandleError(fmt.Errorf("the cluster IP %s appears to have leaked: cleaning up", ip)) + } + }) + + // Blast the rebuilt state into storage. + if err := rebuilt.Snapshot(snapshot); err != nil { return fmt.Errorf("unable to snapshot the updated service IP allocations: %v", err) } - - if err := c.alloc.CreateOrUpdate(latest); err != nil { + if err := c.alloc.CreateOrUpdate(snapshot); err != nil { if errors.IsConflict(err) { return err } diff --git a/pkg/registry/core/service/ipallocator/controller/repair_test.go b/pkg/registry/core/service/ipallocator/controller/repair_test.go index 9306d89c17..7aa3670de2 100644 --- a/pkg/registry/core/service/ipallocator/controller/repair_test.go +++ b/pkg/registry/core/service/ipallocator/controller/repair_test.go @@ -50,10 +50,10 @@ func (r *mockRangeRegistry) CreateOrUpdate(alloc *api.RangeAllocation) error { func TestRepair(t *testing.T) { fakeClient := fake.NewSimpleClientset() - _, cidr, _ := net.ParseCIDR("192.168.1.0/24") ipregistry := &mockRangeRegistry{ - item: &api.RangeAllocation{}, + item: &api.RangeAllocation{Range: "192.168.1.0/24"}, } + _, cidr, _ := net.ParseCIDR(ipregistry.item.Range) r := NewRepair(0, fakeClient.Core(), cidr, ipregistry) if err := r.RunOnce(); err != nil { @@ -64,7 +64,7 @@ func TestRepair(t *testing.T) { } ipregistry = &mockRangeRegistry{ - item: &api.RangeAllocation{}, + item: &api.RangeAllocation{Range: "192.168.1.0/24"}, updateErr: fmt.Errorf("test error"), } r = NewRepair(0, fakeClient.Core(), cidr, ipregistry) @@ -73,7 +73,7 @@ func TestRepair(t *testing.T) { } } -func TestRepairEmpty(t *testing.T) { +func TestRepairLeak(t *testing.T) { _, cidr, _ := net.ParseCIDR("192.168.1.0/24") previous := ipallocator.NewCIDRRange(cidr) previous.Allocate(net.ParseIP("192.168.1.10")) @@ -94,16 +94,31 @@ func TestRepairEmpty(t *testing.T) { Data: dst.Data, }, } + r := NewRepair(0, fakeClient.Core(), cidr, ipregistry) + // Run through the "leak detection holdoff" loops. + for i := 0; i < (numRepairsBeforeLeakCleanup - 1); i++ { + if err := r.RunOnce(); err != nil { + t.Fatal(err) + } + after, err := ipallocator.NewFromSnapshot(ipregistry.updated) + if err != nil { + t.Fatal(err) + } + if !after.Has(net.ParseIP("192.168.1.10")) { + t.Errorf("expected ipallocator to still have leaked IP") + } + } + // Run one more time to actually remove the leak. if err := r.RunOnce(); err != nil { t.Fatal(err) } - after := ipallocator.NewCIDRRange(cidr) - if err := after.Restore(cidr, ipregistry.updated.Data); err != nil { + after, err := ipallocator.NewFromSnapshot(ipregistry.updated) + if err != nil { t.Fatal(err) } if after.Has(net.ParseIP("192.168.1.10")) { - t.Errorf("unexpected ipallocator state: %#v", after) + t.Errorf("expected ipallocator to not have leaked IP") } } @@ -157,14 +172,14 @@ func TestRepairWithExisting(t *testing.T) { if err := r.RunOnce(); err != nil { t.Fatal(err) } - after := ipallocator.NewCIDRRange(cidr) - if err := after.Restore(cidr, ipregistry.updated.Data); err != nil { + after, err := ipallocator.NewFromSnapshot(ipregistry.updated) + if err != nil { t.Fatal(err) } if !after.Has(net.ParseIP("192.168.1.1")) || !after.Has(net.ParseIP("192.168.1.100")) { t.Errorf("unexpected ipallocator state: %#v", after) } - if after.Free() != 252 { - t.Errorf("unexpected ipallocator state: %#v", after) + if free := after.Free(); free != 252 { + t.Errorf("unexpected ipallocator state: %d free", free) } }