mirror of https://github.com/milvus-io/milvus.git
Fix ut: wait service ready after run Proxy server (#17208)
Signed-off-by: longjiquan <jiquan.long@zilliz.com>pull/17224/head
parent
7409bfc56d
commit
e44afc7aa3
|
@ -18,9 +18,21 @@ package grpcproxy
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||
|
@ -780,6 +792,114 @@ func (m *MockProxy) ListCredUsers(ctx context.Context, req *milvuspb.ListCredUse
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
type WaitOption struct {
|
||||
Duration time.Duration `json:"duration"`
|
||||
Port int `json:"port"`
|
||||
TLSEnabled bool `json:"tls_enabled"`
|
||||
ClientPemPath string `json:"client_pem_path"`
|
||||
ClientKeyPath string `json:"client_key_path"`
|
||||
CaPath string `json:"ca_path"`
|
||||
}
|
||||
|
||||
func (opt *WaitOption) String() string {
|
||||
s, err := json.Marshal(*opt)
|
||||
if err != nil {
|
||||
return fmt.Sprintf("error: %s", err)
|
||||
}
|
||||
return string(s)
|
||||
}
|
||||
|
||||
func newWaitOption(duration time.Duration, Port int, tlsEnabled bool, clientPemPath, clientKeyPath, clientCaPath string) *WaitOption {
|
||||
return &WaitOption{
|
||||
Duration: duration,
|
||||
Port: Port,
|
||||
TLSEnabled: tlsEnabled,
|
||||
ClientPemPath: clientPemPath,
|
||||
ClientKeyPath: clientKeyPath,
|
||||
CaPath: clientCaPath,
|
||||
}
|
||||
}
|
||||
|
||||
func withCredential(clientPemPath, clientKeyPath, clientCaPath string) (credentials.TransportCredentials, error) {
|
||||
cert, err := tls.LoadX509KeyPair(clientPemPath, clientKeyPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
certPool := x509.NewCertPool()
|
||||
ca, err := os.ReadFile(clientCaPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if ok := certPool.AppendCertsFromPEM(ca); !ok {
|
||||
return nil, errors.New("failed to AppendCertsFromPEM")
|
||||
}
|
||||
creds := credentials.NewTLS(&tls.Config{
|
||||
Certificates: []tls.Certificate{cert},
|
||||
ServerName: "localhost",
|
||||
RootCAs: certPool,
|
||||
MinVersion: tls.VersionTLS13,
|
||||
})
|
||||
return creds, nil
|
||||
}
|
||||
|
||||
// waitForGrpcReady block until service available or panic after times out.
|
||||
func waitForGrpcReady(opt *WaitOption) {
|
||||
ticker := time.NewTicker(opt.Duration)
|
||||
ch := make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
// just used in UT to self-check service is available.
|
||||
address := "localhost:" + strconv.Itoa(opt.Port)
|
||||
if !opt.TLSEnabled {
|
||||
_, err := grpc.Dial(address, grpc.WithBlock(), grpc.WithInsecure())
|
||||
ch <- err
|
||||
} else {
|
||||
creds, err := withCredential(opt.ClientPemPath, opt.ClientKeyPath, opt.CaPath)
|
||||
if err != nil {
|
||||
ch <- err
|
||||
}
|
||||
_, err = grpc.Dial(address, grpc.WithBlock(), grpc.WithTransportCredentials(creds))
|
||||
ch <- err
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-ch:
|
||||
if err != nil {
|
||||
log.Error("grpc service not ready",
|
||||
zap.Error(err),
|
||||
zap.Any("option", opt))
|
||||
panic(err)
|
||||
}
|
||||
case <-ticker.C:
|
||||
log.Error("grpc service not ready",
|
||||
zap.Any("option", opt))
|
||||
panic("grpc service not ready")
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: should tls-related configurations be hard code here?
|
||||
var waitDuration = time.Second * 1
|
||||
var clientPemPath = "../../../configs/cert/client.pem"
|
||||
var clientKeyPath = "../../../configs/cert/client.key"
|
||||
|
||||
// waitForServerReady wait for internal grpc service and external service to be ready, according to the params.
|
||||
func waitForServerReady() {
|
||||
waitForGrpcReady(newWaitOption(waitDuration, Params.InternalPort, false, "", "", ""))
|
||||
waitForGrpcReady(newWaitOption(waitDuration, Params.Port, Params.TLSEnabled, clientPemPath, clientKeyPath, Params.CaPemPath))
|
||||
}
|
||||
|
||||
func runAndWaitForServerReady(server *Server) error {
|
||||
err := server.Run()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
waitForServerReady()
|
||||
return nil
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
func Test_NewServer(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
@ -794,7 +914,7 @@ func Test_NewServer(t *testing.T) {
|
|||
server.dataCoordClient = &MockDataCoord{}
|
||||
|
||||
t.Run("Run", func(t *testing.T) {
|
||||
err = server.Run()
|
||||
err = runAndWaitForServerReady(server)
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
|
||||
|
@ -1054,8 +1174,9 @@ func Test_NewServer(t *testing.T) {
|
|||
// Update config and start server again to test with different config set.
|
||||
// This works as config will be initialized only once
|
||||
proxy.Params.ProxyCfg.GinLogging = false
|
||||
err = server.Run()
|
||||
err = runAndWaitForServerReady(server)
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = server.Stop()
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
@ -1197,9 +1318,10 @@ func Test_NewServer_HTTPServerDisabled(t *testing.T) {
|
|||
HTTPParams.InitOnce()
|
||||
HTTPParams.Enabled = false
|
||||
|
||||
err = server.Run()
|
||||
err = runAndWaitForServerReady(server)
|
||||
assert.Nil(t, err)
|
||||
assert.Nil(t, server.httpServer)
|
||||
|
||||
err = server.Stop()
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
@ -1220,9 +1342,9 @@ func Test_NewServer_TLS(t *testing.T) {
|
|||
Params.ServerKeyPath = "../../../configs/cert/server.key"
|
||||
Params.CaPemPath = "../../../configs/cert/ca.pem"
|
||||
|
||||
err = server.Run()
|
||||
err = runAndWaitForServerReady(server)
|
||||
assert.Nil(t, err)
|
||||
assert.Nil(t, server.httpServer)
|
||||
|
||||
err = server.Stop()
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue