Merge pull request #1140 from r2d4/service-refactor

Refactor services into its own package
pull/1149/head
Matt Rickard 2017-02-17 15:39:36 -08:00 committed by GitHub
commit 88f8782d97
9 changed files with 457 additions and 498 deletions

View File

@ -25,6 +25,7 @@ import (
"k8s.io/minikube/pkg/minikube/assets"
"k8s.io/minikube/pkg/minikube/cluster"
"k8s.io/minikube/pkg/minikube/machine"
"k8s.io/minikube/pkg/minikube/service"
)
var (
@ -86,7 +87,7 @@ minikube addons enable %s`, addonName, addonName))
namespace := "kube-system"
key := "kubernetes.io/minikube-addons-endpoint"
serviceList, err := cluster.GetServiceListByLabel(namespace, key, addonName)
serviceList, err := service.GetServiceListByLabel(namespace, key, addonName)
if err != nil {
fmt.Fprintf(os.Stderr, "Error getting service with namespace: %s and labels %s:%s: %s\n", namespace, key, addonName, err)
os.Exit(1)
@ -99,8 +100,8 @@ You can add one by annotating a service with the label %s:%s
os.Exit(0)
}
for i := range serviceList.Items {
service := serviceList.Items[i].ObjectMeta.Name
cluster.WaitAndMaybeOpenService(api, namespace, service, addonsURLTemplate, addonsURLMode, https)
svc := serviceList.Items[i].ObjectMeta.Name
service.WaitAndMaybeOpenService(api, namespace, svc, addonsURLTemplate, addonsURLMode, https)
}
},

View File

@ -27,6 +27,7 @@ import (
"github.com/spf13/cobra"
"k8s.io/minikube/pkg/minikube/cluster"
"k8s.io/minikube/pkg/minikube/machine"
"k8s.io/minikube/pkg/minikube/service"
commonutil "k8s.io/minikube/pkg/util"
)
@ -50,14 +51,14 @@ var dashboardCmd = &cobra.Command{
cluster.EnsureMinikubeRunningOrExit(api, 1)
namespace := "kube-system"
service := "kubernetes-dashboard"
svc := "kubernetes-dashboard"
if err = commonutil.RetryAfter(20, func() error { return cluster.CheckService(namespace, service) }, 6*time.Second); err != nil {
fmt.Fprintf(os.Stderr, "Could not find finalized endpoint being pointed to by %s: %s\n", service, err)
if err = commonutil.RetryAfter(20, func() error { return service.CheckService(namespace, svc) }, 6*time.Second); err != nil {
fmt.Fprintf(os.Stderr, "Could not find finalized endpoint being pointed to by %s: %s\n", svc, err)
os.Exit(1)
}
urls, err := cluster.GetServiceURLsForService(api, namespace, service, template.Must(template.New("dashboardServiceFormat").Parse(defaultServiceFormatTemplate)))
urls, err := service.GetServiceURLsForService(api, namespace, svc, template.Must(template.New("dashboardServiceFormat").Parse(defaultServiceFormatTemplate)))
if err != nil {
fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, "Check that minikube is running.")

View File

@ -21,14 +21,14 @@ import (
"os"
"text/template"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"k8s.io/client-go/1.5/pkg/api/v1"
"k8s.io/minikube/pkg/minikube/cluster"
"k8s.io/minikube/pkg/minikube/machine"
"k8s.io/minikube/pkg/util"
"k8s.io/minikube/pkg/minikube/service"
)
const defaultServiceFormatTemplate = "http://{{.IP}}:{{.Port}}"
var (
namespace string
https bool
@ -59,7 +59,7 @@ var serviceCmd = &cobra.Command{
os.Exit(1)
}
service := args[0]
svc := args[0]
api, err := machine.NewAPIClient(clientType)
if err != nil {
fmt.Fprintf(os.Stderr, "Error getting client: %s\n", err)
@ -68,17 +68,15 @@ var serviceCmd = &cobra.Command{
defer api.Close()
cluster.EnsureMinikubeRunningOrExit(api, 1)
if err := validateService(namespace, service); err != nil {
if err := service.ValidateService(namespace, svc); err != nil {
fmt.Fprintln(os.Stderr, fmt.Sprintf("service '%s' could not be found running in namespace '%s' within kubernetes",
service, namespace))
svc, namespace))
os.Exit(1)
}
cluster.WaitAndMaybeOpenService(api, namespace, service, serviceURLTemplate, serviceURLMode, https)
service.WaitAndMaybeOpenService(api, namespace, svc, serviceURLTemplate, serviceURLMode, https)
},
}
const defaultServiceFormatTemplate = "http://{{.IP}}:{{.Port}}"
func init() {
serviceCmd.Flags().StringVarP(&namespace, "namespace", "n", "default", "The service namespace")
serviceCmd.Flags().BoolVar(&serviceURLMode, "url", false, "Display the kubernetes service URL in the CLI instead of opening it in the default browser")
@ -88,49 +86,3 @@ func init() {
RootCmd.AddCommand(serviceCmd)
}
func validateService(namespace string, service string) error {
client, err := cluster.GetKubernetesClient()
if err != nil {
return errors.Wrap(err, "error validating input service name")
}
services := client.Services(namespace)
if _, err = services.Get(service); err != nil {
return errors.Wrapf(err, "service '%s' could not be found running in namespace '%s' within kubernetes", service, namespace)
}
return nil
}
// CheckService waits for the specified service to be ready by returning an error until the service is up
// The check is done by polling the endpoint associated with the service and when the endpoint exists, returning no error->service-online
func CheckService(namespace string, service string) error {
client, err := cluster.GetKubernetesClient()
if err != nil {
return &util.RetriableError{Err: err}
}
endpoints := client.Endpoints(namespace)
if err != nil {
return &util.RetriableError{Err: err}
}
endpoint, err := endpoints.Get(service)
if err != nil {
return &util.RetriableError{Err: err}
}
return CheckEndpointReady(endpoint)
}
const notReadyMsg = "Waiting, endpoint for service is not ready yet...\n"
func CheckEndpointReady(endpoint *v1.Endpoints) error {
if len(endpoint.Subsets) == 0 {
fmt.Fprintf(os.Stderr, notReadyMsg)
return &util.RetriableError{Err: errors.New("Endpoint for service is not ready yet")}
}
for _, subset := range endpoint.Subsets {
if len(subset.Addresses) == 0 {
fmt.Fprintf(os.Stderr, notReadyMsg)
return &util.RetriableError{Err: errors.New("No endpoints for service are ready yet")}
}
}
return nil
}

View File

@ -25,8 +25,8 @@ import (
"github.com/spf13/cobra"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/minikube/pkg/minikube/cluster"
"k8s.io/minikube/pkg/minikube/machine"
"k8s.io/minikube/pkg/minikube/service"
)
var serviceListNamespace string
@ -43,7 +43,7 @@ var serviceListCmd = &cobra.Command{
os.Exit(1)
}
defer api.Close()
serviceURLs, err := cluster.GetServiceURLs(api, serviceListNamespace, serviceURLTemplate)
serviceURLs, err := service.GetServiceURLs(api, serviceListNamespace, serviceURLTemplate)
if err != nil {
fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, "Check that minikube is running and that you have specified the correct namespace (-n flag) if required.")

View File

@ -1,55 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cmd
import (
"testing"
"k8s.io/client-go/1.5/pkg/api/v1"
)
func TestCheckEndpointReady(t *testing.T) {
endpointNoSubsets := &v1.Endpoints{}
if err := CheckEndpointReady(endpointNoSubsets); err == nil {
t.Fatalf("Endpoint had no subsets but CheckEndpointReady did not return an error")
}
endpointNotReady := &v1.Endpoints{
Subsets: []v1.EndpointSubset{
{Addresses: []v1.EndpointAddress{},
NotReadyAddresses: []v1.EndpointAddress{
{IP: "1.1.1.1"},
{IP: "2.2.2.2"},
{IP: "3.3.3.3"},
}}}}
if err := CheckEndpointReady(endpointNotReady); err == nil {
t.Fatalf("Endpoint had no Addresses but CheckEndpointReady did not return an error")
}
endpointReady := &v1.Endpoints{
Subsets: []v1.EndpointSubset{
{Addresses: []v1.EndpointAddress{
{IP: "1.1.1.1"},
{IP: "2.2.2.2"},
},
NotReadyAddresses: []v1.EndpointAddress{},
}},
}
if err := CheckEndpointReady(endpointReady); err != nil {
t.Fatalf("Endpoint was ready with at least one Address, but CheckEndpointReady returned an error")
}
}

View File

@ -23,11 +23,9 @@ import (
"fmt"
"io/ioutil"
"net"
"net/url"
"os"
"path/filepath"
"strings"
"text/template"
"time"
"github.com/docker/machine/drivers/virtualbox"
@ -37,13 +35,8 @@ import (
"github.com/docker/machine/libmachine/host"
"github.com/docker/machine/libmachine/state"
"github.com/golang/glog"
"github.com/pkg/browser"
"github.com/pkg/errors"
"k8s.io/client-go/1.5/kubernetes"
corev1 "k8s.io/client-go/1.5/kubernetes/typed/core/v1"
kubeapi "k8s.io/client-go/1.5/pkg/api"
"k8s.io/client-go/1.5/pkg/api/v1"
"k8s.io/client-go/1.5/pkg/labels"
"k8s.io/client-go/1.5/tools/clientcmd"
"k8s.io/minikube/pkg/minikube/assets"
@ -417,104 +410,6 @@ func CreateSSHShell(api libmachine.API, args []string) error {
return client.Shell(strings.Join(args, " "))
}
func GetServiceURLsForService(api libmachine.API, namespace, service string, t *template.Template) ([]string, error) {
host, err := CheckIfApiExistsAndLoad(api)
if err != nil {
return nil, errors.Wrap(err, "Error checking if api exist and loading it")
}
ip, err := host.Driver.GetIP()
if err != nil {
return nil, errors.Wrap(err, "Error getting ip from host")
}
client, err := GetKubernetesClient()
if err != nil {
return nil, err
}
return getServiceURLsWithClient(client, ip, namespace, service, t)
}
func getServiceURLsWithClient(client *kubernetes.Clientset, ip, namespace, service string, t *template.Template) ([]string, error) {
if t == nil {
return nil, errors.New("Error, attempted to generate service url with nil --format template")
}
ports, err := getServicePorts(client, namespace, service)
if err != nil {
return nil, err
}
urls := []string{}
for _, port := range ports {
var doc bytes.Buffer
err = t.Execute(&doc, struct {
IP string
Port int32
}{
ip,
port,
})
if err != nil {
return nil, err
}
u, err := url.Parse(doc.String())
if err != nil {
return nil, err
}
urls = append(urls, u.String())
}
return urls, nil
}
type serviceGetter interface {
Get(name string) (*v1.Service, error)
List(kubeapi.ListOptions) (*v1.ServiceList, error)
}
func getServicePorts(client *kubernetes.Clientset, namespace, service string) ([]int32, error) {
services := client.Services(namespace)
return getServicePortsFromServiceGetter(services, service)
}
type MissingNodePortError struct {
service *v1.Service
}
func (e MissingNodePortError) Error() string {
return fmt.Sprintf("Service %s/%s does not have a node port. To have one assigned automatically, the service type must be NodePort or LoadBalancer, but this service is of type %s.", e.service.Namespace, e.service.Name, e.service.Spec.Type)
}
func getServiceFromServiceGetter(services serviceGetter, service string) (*v1.Service, error) {
svc, err := services.Get(service)
if err != nil {
return nil, fmt.Errorf("Error getting %s service: %s", service, err)
}
return svc, nil
}
func getServicePortsFromServiceGetter(services serviceGetter, service string) ([]int32, error) {
svc, err := getServiceFromServiceGetter(services, service)
if err != nil {
return nil, err
}
var nodePorts []int32
if len(svc.Spec.Ports) > 0 {
for _, port := range svc.Spec.Ports {
if port.NodePort > 0 {
nodePorts = append(nodePorts, port.NodePort)
}
}
}
if len(nodePorts) == 0 {
return nil, MissingNodePortError{svc}
}
return nodePorts, nil
}
func GetKubernetesClient() (*kubernetes.Clientset, error) {
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
configOverrides := &clientcmd.ConfigOverrides{}
@ -543,131 +438,3 @@ func EnsureMinikubeRunningOrExit(api libmachine.API, exitStatus int) {
os.Exit(exitStatus)
}
}
type ServiceURL struct {
Namespace string
Name string
URLs []string
}
type ServiceURLs []ServiceURL
func GetServiceURLs(api libmachine.API, namespace string, t *template.Template) (ServiceURLs, error) {
host, err := CheckIfApiExistsAndLoad(api)
if err != nil {
return nil, err
}
ip, err := host.Driver.GetIP()
if err != nil {
return nil, err
}
client, err := GetKubernetesClient()
if err != nil {
return nil, err
}
getter := client.Services(namespace)
svcs, err := getter.List(kubeapi.ListOptions{})
if err != nil {
return nil, err
}
var serviceURLs []ServiceURL
for _, svc := range svcs.Items {
urls, err := getServiceURLsWithClient(client, ip, svc.Namespace, svc.Name, t)
if err != nil {
if _, ok := err.(MissingNodePortError); ok {
serviceURLs = append(serviceURLs, ServiceURL{Namespace: svc.Namespace, Name: svc.Name})
continue
}
return nil, err
}
serviceURLs = append(serviceURLs, ServiceURL{Namespace: svc.Namespace, Name: svc.Name, URLs: urls})
}
return serviceURLs, nil
}
// CheckService waits for the specified service to be ready by returning an error until the service is up
// The check is done by polling the endpoint associated with the service and when the endpoint exists, returning no error->service-online
func CheckService(namespace string, service string) error {
client, err := GetKubernetesClient()
if err != nil {
return &util.RetriableError{Err: err}
}
endpoints := client.Endpoints(namespace)
if err != nil {
return &util.RetriableError{Err: err}
}
endpoint, err := endpoints.Get(service)
if err != nil {
return &util.RetriableError{Err: err}
}
return checkEndpointReady(endpoint)
}
func checkEndpointReady(endpoint *v1.Endpoints) error {
const notReadyMsg = "Waiting, endpoint for service is not ready yet...\n"
if len(endpoint.Subsets) == 0 {
fmt.Fprintf(os.Stderr, notReadyMsg)
return &util.RetriableError{Err: errors.New("Endpoint for service is not ready yet")}
}
for _, subset := range endpoint.Subsets {
if len(subset.Addresses) == 0 {
fmt.Fprintf(os.Stderr, notReadyMsg)
return &util.RetriableError{Err: errors.New("No endpoints for service are ready yet")}
}
}
return nil
}
func WaitAndMaybeOpenService(api libmachine.API, namespace string, service string, urlTemplate *template.Template, urlMode bool, https bool) {
if err := util.RetryAfter(20, func() error { return CheckService(namespace, service) }, 6*time.Second); err != nil {
fmt.Fprintf(os.Stderr, "Could not find finalized endpoint being pointed to by %s: %s\n", service, err)
os.Exit(1)
}
urls, err := GetServiceURLsForService(api, namespace, service, urlTemplate)
if err != nil {
fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, "Check that minikube is running and that you have specified the correct namespace (-n flag).")
os.Exit(1)
}
for _, url := range urls {
if https {
url = strings.Replace(url, "http", "https", 1)
}
if urlMode || !strings.HasPrefix(url, "http") {
fmt.Fprintln(os.Stdout, url)
} else {
fmt.Fprintln(os.Stdout, "Opening kubernetes service "+namespace+"/"+service+" in default browser...")
browser.OpenURL(url)
}
}
}
func GetServiceListByLabel(namespace string, key string, value string) (*v1.ServiceList, error) {
client, err := GetKubernetesClient()
if err != nil {
return &v1.ServiceList{}, &util.RetriableError{Err: err}
}
services := client.Services(namespace)
if err != nil {
return &v1.ServiceList{}, &util.RetriableError{Err: err}
}
return getServiceListFromServicesByLabel(services, key, value)
}
func getServiceListFromServicesByLabel(services corev1.ServiceInterface, key string, value string) (*v1.ServiceList, error) {
selector := labels.SelectorFromSet(labels.Set(map[string]string{key: value}))
serviceList, err := services.List(kubeapi.ListOptions{LabelSelector: selector})
if err != nil {
return &v1.ServiceList{}, &util.RetriableError{Err: err}
}
return serviceList, nil
}

View File

@ -31,11 +31,6 @@ import (
"github.com/docker/machine/libmachine/host"
"github.com/docker/machine/libmachine/provision"
"github.com/docker/machine/libmachine/state"
"github.com/pkg/errors"
"k8s.io/client-go/1.5/kubernetes/typed/core/v1/fake"
"k8s.io/client-go/1.5/pkg/api"
"k8s.io/client-go/1.5/pkg/api/unversioned"
"k8s.io/client-go/1.5/pkg/api/v1"
"k8s.io/minikube/pkg/minikube/assets"
"k8s.io/minikube/pkg/minikube/constants"
"k8s.io/minikube/pkg/minikube/tests"
@ -530,73 +525,6 @@ func TestCreateSSHShell(t *testing.T) {
}
}
type MockServiceGetter struct {
services map[string]v1.Service
}
func NewMockServiceGetter() *MockServiceGetter {
return &MockServiceGetter{
services: make(map[string]v1.Service),
}
}
func (mockServiceGetter *MockServiceGetter) Get(name string) (*v1.Service, error) {
service, ok := mockServiceGetter.services[name]
if !ok {
return nil, errors.Errorf("Error getting %s service from mockServiceGetter", name)
}
return &service, nil
}
func (mockServiceGetter *MockServiceGetter) List(options api.ListOptions) (*v1.ServiceList, error) {
services := v1.ServiceList{
TypeMeta: unversioned.TypeMeta{Kind: "ServiceList", APIVersion: "v1"},
ListMeta: unversioned.ListMeta{},
}
for _, svc := range mockServiceGetter.services {
services.Items = append(services.Items, svc)
}
return &services, nil
}
func TestGetServiceURLs(t *testing.T) {
mockServiceGetter := NewMockServiceGetter()
expected := []int32{1111, 2222}
mockDashboardService := v1.Service{
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{
{
NodePort: expected[0],
}, {
NodePort: expected[1],
}},
},
}
mockServiceGetter.services["mock-service"] = mockDashboardService
ports, err := getServicePortsFromServiceGetter(mockServiceGetter, "mock-service")
if err != nil {
t.Fatalf("Error getting mock-service ports from api: Error: %s", err)
}
for i := range ports {
if ports[i] != expected[i] {
t.Fatalf("Error getting mock-service port from api: Expected: %d, Got: %d", ports[0], expected)
}
}
}
func TestGetServiceURLWithoutNodePort(t *testing.T) {
mockServiceGetter := NewMockServiceGetter()
mockDashboardService := v1.Service{}
mockServiceGetter.services["mock-service"] = mockDashboardService
_, err := getServicePortsFromServiceGetter(mockServiceGetter, "mock-service")
if err == nil {
t.Fatalf("Expected error getting service with no node port")
}
}
func TestUpdateDefault(t *testing.T) {
s, _ := tests.NewSSHServer()
port, err := s.Start()
@ -782,77 +710,3 @@ func TestUpdateCustomAddons(t *testing.T) {
t.Fatalf("Custom addon not copied. Expected transfers to contain custom addon with content: %s. It was: %s", testContent2, transferred)
}
}
func TestCheckEndpointReady(t *testing.T) {
endpointNoSubsets := &v1.Endpoints{}
if err := checkEndpointReady(endpointNoSubsets); err == nil {
t.Fatalf("Endpoint had no subsets but checkEndpointReady did not return an error")
}
endpointNotReady := &v1.Endpoints{
Subsets: []v1.EndpointSubset{
{Addresses: []v1.EndpointAddress{},
NotReadyAddresses: []v1.EndpointAddress{
{IP: "1.1.1.1"},
{IP: "2.2.2.2"},
{IP: "3.3.3.3"},
}}}}
if err := checkEndpointReady(endpointNotReady); err == nil {
t.Fatalf("Endpoint had no Addresses but checkEndpointReady did not return an error")
}
endpointReady := &v1.Endpoints{
Subsets: []v1.EndpointSubset{
{Addresses: []v1.EndpointAddress{
{IP: "1.1.1.1"},
{IP: "2.2.2.2"},
},
NotReadyAddresses: []v1.EndpointAddress{},
}},
}
if err := checkEndpointReady(endpointReady); err != nil {
t.Fatalf("Endpoint was ready with at least one Address, but checkEndpointReady returned an error")
}
}
type ServiceInterfaceMock struct {
fake.FakeServices
ServiceList *v1.ServiceList
}
func (s ServiceInterfaceMock) List(opts api.ListOptions) (*v1.ServiceList, error) {
serviceList := &v1.ServiceList{
Items: []v1.Service{},
}
keyValArr := strings.Split(opts.LabelSelector.String(), "=")
for _, service := range s.ServiceList.Items {
if service.Spec.Selector[keyValArr[0]] == keyValArr[1] {
serviceList.Items = append(serviceList.Items, service)
}
}
return serviceList, nil
}
func TestGetServiceListFromServicesByLabel(t *testing.T) {
serviceList := &v1.ServiceList{
Items: []v1.Service{
{
Spec: v1.ServiceSpec{
Selector: map[string]string{
"foo": "bar",
},
},
},
},
}
serviceIface := ServiceInterfaceMock{
ServiceList: serviceList,
}
if _, err := getServiceListFromServicesByLabel(&serviceIface, "nothing", "nothing"); err != nil {
t.Fatalf("Service had no label match, but getServiceListFromServicesByLabel returned an error")
}
if _, err := getServiceListFromServicesByLabel(&serviceIface, "foo", "bar"); err != nil {
t.Fatalf("Endpoint was ready with at least one Address, but getServiceListFromServicesByLabel returned an error")
}
}

View File

@ -0,0 +1,270 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package service
import (
"bytes"
"fmt"
"net/url"
"os"
"strings"
"time"
"github.com/docker/machine/libmachine"
"github.com/pkg/browser"
"github.com/pkg/errors"
"k8s.io/client-go/1.5/kubernetes"
corev1 "k8s.io/client-go/1.5/kubernetes/typed/core/v1"
kubeapi "k8s.io/client-go/1.5/pkg/api"
"k8s.io/client-go/1.5/pkg/api/v1"
"text/template"
"k8s.io/client-go/1.5/pkg/labels"
"k8s.io/minikube/pkg/minikube/cluster"
"k8s.io/minikube/pkg/util"
)
type ServiceURL struct {
Namespace string
Name string
URLs []string
}
type ServiceURLs []ServiceURL
func GetServiceURLs(api libmachine.API, namespace string, t *template.Template) (ServiceURLs, error) {
host, err := cluster.CheckIfApiExistsAndLoad(api)
if err != nil {
return nil, err
}
ip, err := host.Driver.GetIP()
if err != nil {
return nil, err
}
client, err := cluster.GetKubernetesClient()
if err != nil {
return nil, err
}
getter := client.Services(namespace)
svcs, err := getter.List(kubeapi.ListOptions{})
if err != nil {
return nil, err
}
var serviceURLs []ServiceURL
for _, svc := range svcs.Items {
urls, err := getServiceURLsWithClient(client, ip, svc.Namespace, svc.Name, t)
if err != nil {
if _, ok := err.(MissingNodePortError); ok {
serviceURLs = append(serviceURLs, ServiceURL{Namespace: svc.Namespace, Name: svc.Name})
continue
}
return nil, err
}
serviceURLs = append(serviceURLs, ServiceURL{Namespace: svc.Namespace, Name: svc.Name, URLs: urls})
}
return serviceURLs, nil
}
// CheckService waits for the specified service to be ready by returning an error until the service is up
// The check is done by polling the endpoint associated with the service and when the endpoint exists, returning no error->service-online
func CheckService(namespace string, service string) error {
client, err := cluster.GetKubernetesClient()
if err != nil {
return &util.RetriableError{Err: err}
}
endpoints := client.Endpoints(namespace)
if err != nil {
return &util.RetriableError{Err: err}
}
endpoint, err := endpoints.Get(service)
if err != nil {
return &util.RetriableError{Err: err}
}
return checkEndpointReady(endpoint)
}
func checkEndpointReady(endpoint *v1.Endpoints) error {
const notReadyMsg = "Waiting, endpoint for service is not ready yet...\n"
if len(endpoint.Subsets) == 0 {
fmt.Fprintf(os.Stderr, notReadyMsg)
return &util.RetriableError{Err: errors.New("Endpoint for service is not ready yet")}
}
for _, subset := range endpoint.Subsets {
if len(subset.Addresses) == 0 {
fmt.Fprintf(os.Stderr, notReadyMsg)
return &util.RetriableError{Err: errors.New("No endpoints for service are ready yet")}
}
}
return nil
}
func WaitAndMaybeOpenService(api libmachine.API, namespace string, service string, urlTemplate *template.Template, urlMode bool, https bool) {
if err := util.RetryAfter(20, func() error { return CheckService(namespace, service) }, 6*time.Second); err != nil {
fmt.Fprintf(os.Stderr, "Could not find finalized endpoint being pointed to by %s: %s\n", service, err)
os.Exit(1)
}
urls, err := GetServiceURLsForService(api, namespace, service, urlTemplate)
if err != nil {
fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, "Check that minikube is running and that you have specified the correct namespace (-n flag).")
os.Exit(1)
}
for _, url := range urls {
if https {
url = strings.Replace(url, "http", "https", 1)
}
if urlMode || !strings.HasPrefix(url, "http") {
fmt.Fprintln(os.Stdout, url)
} else {
fmt.Fprintln(os.Stdout, "Opening kubernetes service "+namespace+"/"+service+" in default browser...")
browser.OpenURL(url)
}
}
}
func GetServiceListByLabel(namespace string, key string, value string) (*v1.ServiceList, error) {
client, err := cluster.GetKubernetesClient()
if err != nil {
return &v1.ServiceList{}, &util.RetriableError{Err: err}
}
services := client.Services(namespace)
if err != nil {
return &v1.ServiceList{}, &util.RetriableError{Err: err}
}
return getServiceListFromServicesByLabel(services, key, value)
}
func getServiceListFromServicesByLabel(services corev1.ServiceInterface, key string, value string) (*v1.ServiceList, error) {
selector := labels.SelectorFromSet(labels.Set(map[string]string{key: value}))
serviceList, err := services.List(kubeapi.ListOptions{LabelSelector: selector})
if err != nil {
return &v1.ServiceList{}, &util.RetriableError{Err: err}
}
return serviceList, nil
}
func getServicePortsFromServiceGetter(services serviceGetter, service string) ([]int32, error) {
svc, err := services.Get(service)
if err != nil {
return nil, fmt.Errorf("Error getting %s service: %s", service, err)
}
var nodePorts []int32
if len(svc.Spec.Ports) > 0 {
for _, port := range svc.Spec.Ports {
if port.NodePort > 0 {
nodePorts = append(nodePorts, port.NodePort)
}
}
}
if len(nodePorts) == 0 {
return nil, MissingNodePortError{svc}
}
return nodePorts, nil
}
type serviceGetter interface {
Get(name string) (*v1.Service, error)
List(kubeapi.ListOptions) (*v1.ServiceList, error)
}
func getServicePorts(client *kubernetes.Clientset, namespace, service string) ([]int32, error) {
services := client.Services(namespace)
return getServicePortsFromServiceGetter(services, service)
}
type MissingNodePortError struct {
service *v1.Service
}
func (e MissingNodePortError) Error() string {
return fmt.Sprintf("Service %s/%s does not have a node port. To have one assigned automatically, the service type must be NodePort or LoadBalancer, but this service is of type %s.", e.service.Namespace, e.service.Name, e.service.Spec.Type)
}
func GetServiceURLsForService(api libmachine.API, namespace, service string, t *template.Template) ([]string, error) {
host, err := cluster.CheckIfApiExistsAndLoad(api)
if err != nil {
return nil, errors.Wrap(err, "Error checking if api exist and loading it")
}
ip, err := host.Driver.GetIP()
if err != nil {
return nil, errors.Wrap(err, "Error getting ip from host")
}
client, err := cluster.GetKubernetesClient()
if err != nil {
return nil, err
}
return getServiceURLsWithClient(client, ip, namespace, service, t)
}
func getServiceURLsWithClient(client *kubernetes.Clientset, ip, namespace, service string, t *template.Template) ([]string, error) {
if t == nil {
return nil, errors.New("Error, attempted to generate service url with nil --format template")
}
ports, err := getServicePorts(client, namespace, service)
if err != nil {
return nil, err
}
urls := []string{}
for _, port := range ports {
var doc bytes.Buffer
err = t.Execute(&doc, struct {
IP string
Port int32
}{
ip,
port,
})
if err != nil {
return nil, err
}
u, err := url.Parse(doc.String())
if err != nil {
return nil, err
}
urls = append(urls, u.String())
}
return urls, nil
}
func ValidateService(namespace string, service string) error {
client, err := cluster.GetKubernetesClient()
if err != nil {
return errors.Wrap(err, "error validating input service name")
}
services := client.Services(namespace)
if _, err = services.Get(service); err != nil {
return errors.Wrapf(err, "service '%s' could not be found running in namespace '%s' within kubernetes", service, namespace)
}
return nil
}

View File

@ -0,0 +1,169 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package service
import (
"strings"
"testing"
"github.com/pkg/errors"
"k8s.io/client-go/1.5/kubernetes/typed/core/v1/fake"
"k8s.io/client-go/1.5/pkg/api"
"k8s.io/client-go/1.5/pkg/api/unversioned"
"k8s.io/client-go/1.5/pkg/api/v1"
)
func TestCheckEndpointReady(t *testing.T) {
endpointNoSubsets := &v1.Endpoints{}
if err := checkEndpointReady(endpointNoSubsets); err == nil {
t.Fatalf("Endpoint had no subsets but checkEndpointReady did not return an error")
}
endpointNotReady := &v1.Endpoints{
Subsets: []v1.EndpointSubset{
{Addresses: []v1.EndpointAddress{},
NotReadyAddresses: []v1.EndpointAddress{
{IP: "1.1.1.1"},
{IP: "2.2.2.2"},
{IP: "3.3.3.3"},
}}}}
if err := checkEndpointReady(endpointNotReady); err == nil {
t.Fatalf("Endpoint had no Addresses but checkEndpointReady did not return an error")
}
endpointReady := &v1.Endpoints{
Subsets: []v1.EndpointSubset{
{Addresses: []v1.EndpointAddress{
{IP: "1.1.1.1"},
{IP: "2.2.2.2"},
},
NotReadyAddresses: []v1.EndpointAddress{},
}},
}
if err := checkEndpointReady(endpointReady); err != nil {
t.Fatalf("Endpoint was ready with at least one Address, but checkEndpointReady returned an error")
}
}
type ServiceInterfaceMock struct {
fake.FakeServices
ServiceList *v1.ServiceList
}
func (s ServiceInterfaceMock) List(opts api.ListOptions) (*v1.ServiceList, error) {
serviceList := &v1.ServiceList{
Items: []v1.Service{},
}
keyValArr := strings.Split(opts.LabelSelector.String(), "=")
for _, service := range s.ServiceList.Items {
if service.Spec.Selector[keyValArr[0]] == keyValArr[1] {
serviceList.Items = append(serviceList.Items, service)
}
}
return serviceList, nil
}
func TestGetServiceListFromServicesByLabel(t *testing.T) {
serviceList := &v1.ServiceList{
Items: []v1.Service{
{
Spec: v1.ServiceSpec{
Selector: map[string]string{
"foo": "bar",
},
},
},
},
}
serviceIface := ServiceInterfaceMock{
ServiceList: serviceList,
}
if _, err := getServiceListFromServicesByLabel(&serviceIface, "nothing", "nothing"); err != nil {
t.Fatalf("Service had no label match, but getServiceListFromServicesByLabel returned an error")
}
if _, err := getServiceListFromServicesByLabel(&serviceIface, "foo", "bar"); err != nil {
t.Fatalf("Endpoint was ready with at least one Address, but getServiceListFromServicesByLabel returned an error")
}
}
type MockServiceGetter struct {
services map[string]v1.Service
}
func NewMockServiceGetter() *MockServiceGetter {
return &MockServiceGetter{
services: make(map[string]v1.Service),
}
}
func (mockServiceGetter *MockServiceGetter) Get(name string) (*v1.Service, error) {
service, ok := mockServiceGetter.services[name]
if !ok {
return nil, errors.Errorf("Error getting %s service from mockServiceGetter", name)
}
return &service, nil
}
func (mockServiceGetter *MockServiceGetter) List(options api.ListOptions) (*v1.ServiceList, error) {
services := v1.ServiceList{
TypeMeta: unversioned.TypeMeta{Kind: "ServiceList", APIVersion: "v1"},
ListMeta: unversioned.ListMeta{},
}
for _, svc := range mockServiceGetter.services {
services.Items = append(services.Items, svc)
}
return &services, nil
}
func TestGetServiceURLs(t *testing.T) {
mockServiceGetter := NewMockServiceGetter()
expected := []int32{1111, 2222}
mockDashboardService := v1.Service{
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{
{
NodePort: expected[0],
}, {
NodePort: expected[1],
}},
},
}
mockServiceGetter.services["mock-service"] = mockDashboardService
ports, err := getServicePortsFromServiceGetter(mockServiceGetter, "mock-service")
if err != nil {
t.Fatalf("Error getting mock-service ports from api: Error: %s", err)
}
for i := range ports {
if ports[i] != expected[i] {
t.Fatalf("Error getting mock-service port from api: Expected: %d, Got: %d", ports[0], expected)
}
}
}
func TestGetServiceURLWithoutNodePort(t *testing.T) {
mockServiceGetter := NewMockServiceGetter()
mockDashboardService := v1.Service{}
mockServiceGetter.services["mock-service"] = mockDashboardService
_, err := getServicePortsFromServiceGetter(mockServiceGetter, "mock-service")
if err == nil {
t.Fatalf("Expected error getting service with no node port")
}
}