mirror of https://github.com/milvus-io/milvus.git
enhance: [Cherry-pick] pack proxy connection code and support accesslog print SDK_Version (#28844)
relate: https://github.com/milvus-io/milvus/issues/28086 pr: https://github.com/milvus-io/milvus/pull/28835 --------- Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>pull/29002/head
parent
8fd38c8eea
commit
cb7b1d1d51
|
@ -203,20 +203,20 @@ proxy:
|
||||||
enable: true
|
enable: true
|
||||||
# Log filename, set as "" to use stdout.
|
# Log filename, set as "" to use stdout.
|
||||||
filename: ""
|
filename: ""
|
||||||
# define formatters for access log by names:{format: XXX, method:[XXX]}
|
# define formatters for access log by XXX:{format: XXX, method:[XXX,XXX]}
|
||||||
formatters:
|
formatters:
|
||||||
# "base" formatter could not set method
|
# "base" formatter could not set methods
|
||||||
# all method will use "base" formatter default
|
# all method will use "base" formatter default
|
||||||
base:
|
base:
|
||||||
# will not print access log if set as ""
|
# will not print access log if set as ""
|
||||||
format: "[$time_now] [ACCESS] <$user_name: $user_addr> $method_name-$method_status-$error_code [traceID: $trace_id] [timeCost: $time_cost]"
|
format: "[$time_now] [ACCESS] <$user_name: $user_addr> $method_name [status: $method_status] [code: $error_code] [msg: $error_msg] [traceID: $trace_id] [timeCost: $time_cost]"
|
||||||
query:
|
query:
|
||||||
format: "[$time_now] [ACCESS] <$user_name: $user_addr> $method_status-$method_name [traceID: $trace_id] [timeCost: $time_cost] [database: $database_name] [collection: $collection_name] [partitions: $partition_name] [expr: $method_expr]"
|
format: "[$time_now] [ACCESS] <$user_name: $user_addr> $method_name [status: $method_status] [code: $error_code] [msg: $error_msg] [traceID: $trace_id] [timeCost: $time_cost] [database: $database_name] [collection: $collection_name] [partitions: $partition_name] [expr: $method_expr]"
|
||||||
# set formatter owners by method name(method was all milvus external interface)
|
# set formatter owners by method name(method was all milvus external interface)
|
||||||
# all method will use base formatter default
|
# all method will use base formatter default
|
||||||
# one method only could use one formatter
|
# one method only could use one formatter
|
||||||
# if set a method formatter mutiple times, will use random fomatter.
|
# if set a method formatter mutiple times, will use random fomatter.
|
||||||
methods: ["Query", "Search"]
|
methods: ["Query", "Search", "Delete"]
|
||||||
# localPath: /tmp/milvus_accesslog // log file rootpath
|
# localPath: /tmp/milvus_accesslog // log file rootpath
|
||||||
# maxSize: 64 # max log file size(MB) of singal log file, mean close when time <= 0.
|
# maxSize: 64 # max log file size(MB) of singal log file, mean close when time <= 0.
|
||||||
# rotatedTime: 0 # max time range of singal log file, mean close when time <= 0;
|
# rotatedTime: 0 # max time range of singal log file, mean close when time <= 0;
|
||||||
|
|
|
@ -59,6 +59,7 @@ import (
|
||||||
"github.com/milvus-io/milvus/internal/proto/proxypb"
|
"github.com/milvus-io/milvus/internal/proto/proxypb"
|
||||||
"github.com/milvus-io/milvus/internal/proxy"
|
"github.com/milvus-io/milvus/internal/proxy"
|
||||||
"github.com/milvus-io/milvus/internal/proxy/accesslog"
|
"github.com/milvus-io/milvus/internal/proxy/accesslog"
|
||||||
|
"github.com/milvus-io/milvus/internal/proxy/connection"
|
||||||
"github.com/milvus-io/milvus/internal/types"
|
"github.com/milvus-io/milvus/internal/types"
|
||||||
"github.com/milvus-io/milvus/internal/util/componentutil"
|
"github.com/milvus-io/milvus/internal/util/componentutil"
|
||||||
"github.com/milvus-io/milvus/internal/util/dependency"
|
"github.com/milvus-io/milvus/internal/util/dependency"
|
||||||
|
@ -250,7 +251,7 @@ func (s *Server) startExternalGrpc(grpcPort int, errChan chan error) {
|
||||||
logutil.UnaryTraceLoggerInterceptor,
|
logutil.UnaryTraceLoggerInterceptor,
|
||||||
proxy.RateLimitInterceptor(limiter),
|
proxy.RateLimitInterceptor(limiter),
|
||||||
accesslog.UnaryUpdateAccessInfoInterceptor,
|
accesslog.UnaryUpdateAccessInfoInterceptor,
|
||||||
proxy.KeepActiveInterceptor,
|
connection.KeepActiveInterceptor,
|
||||||
))
|
))
|
||||||
} else {
|
} else {
|
||||||
unaryServerOption = grpc.EmptyServerOption{}
|
unaryServerOption = grpc.EmptyServerOption{}
|
||||||
|
|
|
@ -48,6 +48,7 @@ var metricFuncMap = map[string]getMetricFunc{
|
||||||
"$time_start": getTimeStart,
|
"$time_start": getTimeStart,
|
||||||
"$time_end": getTimeEnd,
|
"$time_end": getTimeEnd,
|
||||||
"$method_expr": getExpr,
|
"$method_expr": getExpr,
|
||||||
|
"$sdk_version": getSdkVersion,
|
||||||
}
|
}
|
||||||
|
|
||||||
var BaseFormatterKey = "base"
|
var BaseFormatterKey = "base"
|
||||||
|
|
|
@ -30,6 +30,7 @@ import (
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||||
|
"github.com/milvus-io/milvus/internal/proxy/connection"
|
||||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||||
"github.com/milvus-io/milvus/pkg/util/requestutil"
|
"github.com/milvus-io/milvus/pkg/util/requestutil"
|
||||||
)
|
)
|
||||||
|
@ -40,22 +41,22 @@ type AccessInfo interface {
|
||||||
|
|
||||||
type GrpcAccessInfo struct {
|
type GrpcAccessInfo struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
info *grpc.UnaryServerInfo
|
|
||||||
status *commonpb.Status
|
status *commonpb.Status
|
||||||
req interface{}
|
req interface{}
|
||||||
resp interface{}
|
resp interface{}
|
||||||
err error
|
err error
|
||||||
|
|
||||||
start time.Time
|
grpcInfo *grpc.UnaryServerInfo
|
||||||
end time.Time
|
start time.Time
|
||||||
|
end time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewGrpcAccessInfo(ctx context.Context, info *grpc.UnaryServerInfo, req interface{}) *GrpcAccessInfo {
|
func NewGrpcAccessInfo(ctx context.Context, grpcInfo *grpc.UnaryServerInfo, req interface{}) *GrpcAccessInfo {
|
||||||
accessInfo := &GrpcAccessInfo{
|
accessInfo := &GrpcAccessInfo{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
info: info,
|
grpcInfo: grpcInfo,
|
||||||
req: req,
|
req: req,
|
||||||
start: time.Now(),
|
start: time.Now(),
|
||||||
}
|
}
|
||||||
|
|
||||||
return accessInfo
|
return accessInfo
|
||||||
|
@ -137,7 +138,7 @@ func getTimeEnd(i *GrpcAccessInfo) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func getMethodName(i *GrpcAccessInfo) string {
|
func getMethodName(i *GrpcAccessInfo) string {
|
||||||
_, methodName := path.Split(i.info.FullMethod)
|
_, methodName := path.Split(i.grpcInfo.FullMethod)
|
||||||
return methodName
|
return methodName
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -260,3 +261,11 @@ func getExpr(i *GrpcAccessInfo) string {
|
||||||
}
|
}
|
||||||
return expr.(string)
|
return expr.(string)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getSdkVersion(i *GrpcAccessInfo) string {
|
||||||
|
clientInfo := connection.GetManager().Get(i.ctx)
|
||||||
|
if clientInfo == nil {
|
||||||
|
return unknownString
|
||||||
|
}
|
||||||
|
return clientInfo.SdkType + "-" + clientInfo.SdkVersion
|
||||||
|
}
|
||||||
|
|
|
@ -29,7 +29,9 @@ import (
|
||||||
"google.golang.org/grpc/peer"
|
"google.golang.org/grpc/peer"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||||
|
"github.com/milvus-io/milvus/internal/proxy/connection"
|
||||||
"github.com/milvus-io/milvus/pkg/util"
|
"github.com/milvus-io/milvus/pkg/util"
|
||||||
"github.com/milvus-io/milvus/pkg/util/crypto"
|
"github.com/milvus-io/milvus/pkg/util/crypto"
|
||||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||||
|
@ -63,8 +65,8 @@ func (s *GrpcAccessInfoSuite) SetupSuite() {
|
||||||
}
|
}
|
||||||
|
|
||||||
s.info = &GrpcAccessInfo{
|
s.info = &GrpcAccessInfo{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
info: serverinfo,
|
grpcInfo: serverinfo,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,6 +110,26 @@ func (s *GrpcAccessInfoSuite) TestDbName() {
|
||||||
s.Equal("test", result[0])
|
s.Equal("test", result[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *GrpcAccessInfoSuite) TestSdkInfo() {
|
||||||
|
ctx := context.Background()
|
||||||
|
s.info.ctx = ctx
|
||||||
|
result := s.info.Get("$sdk_version")
|
||||||
|
s.Equal(unknownString, result[0])
|
||||||
|
|
||||||
|
identifier := 11111
|
||||||
|
md := metadata.MD{util.IdentifierKey: []string{fmt.Sprint(identifier)}}
|
||||||
|
ctx = metadata.NewIncomingContext(ctx, md)
|
||||||
|
info := &commonpb.ClientInfo{
|
||||||
|
SdkType: "test",
|
||||||
|
SdkVersion: "1.0",
|
||||||
|
}
|
||||||
|
connection.GetManager().Register(ctx, int64(identifier), info)
|
||||||
|
|
||||||
|
s.info.ctx = ctx
|
||||||
|
result = s.info.Get("$sdk_version")
|
||||||
|
s.Equal(info.SdkType+"-"+info.SdkVersion, result[0])
|
||||||
|
}
|
||||||
|
|
||||||
func TestGrpcAccssInfo(t *testing.T) {
|
func TestGrpcAccssInfo(t *testing.T) {
|
||||||
suite.Run(t, new(GrpcAccessInfoSuite))
|
suite.Run(t, new(GrpcAccessInfoSuite))
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,50 +0,0 @@
|
||||||
package proxy
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"go.uber.org/zap"
|
|
||||||
|
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
|
||||||
"github.com/milvus-io/milvus/pkg/log"
|
|
||||||
)
|
|
||||||
|
|
||||||
type clientInfo struct {
|
|
||||||
*commonpb.ClientInfo
|
|
||||||
identifier int64
|
|
||||||
lastActiveTime time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
func getLoggerOfClientInfo(info *commonpb.ClientInfo) []zap.Field {
|
|
||||||
fields := []zap.Field{
|
|
||||||
zap.String("sdk_type", info.GetSdkType()),
|
|
||||||
zap.String("sdk_version", info.GetSdkVersion()),
|
|
||||||
zap.String("local_time", info.GetLocalTime()),
|
|
||||||
zap.String("user", info.GetUser()),
|
|
||||||
zap.String("host", info.GetHost()),
|
|
||||||
}
|
|
||||||
|
|
||||||
for k, v := range info.GetReserved() {
|
|
||||||
fields = append(fields, zap.String(k, v))
|
|
||||||
}
|
|
||||||
|
|
||||||
return fields
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *clientInfo) getLogger() []zap.Field {
|
|
||||||
fields := getLoggerOfClientInfo(c.ClientInfo)
|
|
||||||
fields = append(fields,
|
|
||||||
zap.Int64("identifier", c.identifier),
|
|
||||||
zap.Time("last_active_time", c.lastActiveTime),
|
|
||||||
)
|
|
||||||
return fields
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *clientInfo) ctxLogRegister(ctx context.Context) {
|
|
||||||
log.Ctx(ctx).Info("client register", c.getLogger()...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *clientInfo) logDeregister() {
|
|
||||||
log.Info("client deregister", c.getLogger()...)
|
|
||||||
}
|
|
|
@ -0,0 +1,24 @@
|
||||||
|
package connection
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||||
|
)
|
||||||
|
|
||||||
|
type clientInfo struct {
|
||||||
|
*commonpb.ClientInfo
|
||||||
|
identifier int64
|
||||||
|
lastActiveTime time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *clientInfo) GetLogger() []zap.Field {
|
||||||
|
fields := ZapClientInfo(c.ClientInfo)
|
||||||
|
fields = append(fields,
|
||||||
|
zap.Int64("identifier", c.identifier),
|
||||||
|
zap.Time("last_active_time", c.lastActiveTime),
|
||||||
|
)
|
||||||
|
return fields
|
||||||
|
}
|
|
@ -0,0 +1,16 @@
|
||||||
|
package connection
|
||||||
|
|
||||||
|
import "sync"
|
||||||
|
|
||||||
|
var connectionManagerInstance *connectionManager
|
||||||
|
|
||||||
|
var getConnectionManagerInstanceOnce sync.Once
|
||||||
|
|
||||||
|
func GetManager() *connectionManager {
|
||||||
|
getConnectionManagerInstanceOnce.Do(func() {
|
||||||
|
connectionManagerInstance = newConnectionManager(
|
||||||
|
withDuration(defaultConnCheckDuration),
|
||||||
|
withTTL(defaultTTLForInactiveConn))
|
||||||
|
})
|
||||||
|
return connectionManagerInstance
|
||||||
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package proxy
|
package connection
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
@ -61,7 +61,7 @@ func (s *connectionManager) init() {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *connectionManager) stop() {
|
func (s *connectionManager) Stop() {
|
||||||
s.stopOnce.Do(func() {
|
s.stopOnce.Do(func() {
|
||||||
close(s.closeSignal)
|
close(s.closeSignal)
|
||||||
s.wg.Wait()
|
s.wg.Wait()
|
||||||
|
@ -80,14 +80,14 @@ func (s *connectionManager) checkLoop() {
|
||||||
log.Info("connection manager closed")
|
log.Info("connection manager closed")
|
||||||
return
|
return
|
||||||
case identifier := <-s.buffer:
|
case identifier := <-s.buffer:
|
||||||
s.update(identifier)
|
s.Update(identifier)
|
||||||
case <-t.C:
|
case <-t.C:
|
||||||
s.removeLongInactiveClients()
|
s.removeLongInactiveClients()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *connectionManager) register(ctx context.Context, identifier int64, info *commonpb.ClientInfo) {
|
func (s *connectionManager) Register(ctx context.Context, identifier int64, info *commonpb.ClientInfo) {
|
||||||
cli := clientInfo{
|
cli := clientInfo{
|
||||||
ClientInfo: info,
|
ClientInfo: info,
|
||||||
identifier: identifier,
|
identifier: identifier,
|
||||||
|
@ -98,38 +98,15 @@ func (s *connectionManager) register(ctx context.Context, identifier int64, info
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
s.clientInfos[identifier] = cli
|
s.clientInfos[identifier] = cli
|
||||||
cli.ctxLogRegister(ctx)
|
log.Ctx(ctx).Info("client register", cli.GetLogger()...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *connectionManager) keepActive(identifier int64) {
|
func (s *connectionManager) KeepActive(identifier int64) {
|
||||||
// make this asynchronous and then the rpc won't be blocked too long.
|
// make this asynchronous and then the rpc won't be blocked too long.
|
||||||
s.buffer <- identifier
|
s.buffer <- identifier
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *connectionManager) update(identifier int64) {
|
func (s *connectionManager) List() []*commonpb.ClientInfo {
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
|
|
||||||
cli, ok := s.clientInfos[identifier]
|
|
||||||
if ok {
|
|
||||||
cli.lastActiveTime = time.Now()
|
|
||||||
s.clientInfos[identifier] = cli
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *connectionManager) removeLongInactiveClients() {
|
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
|
|
||||||
for candidate, cli := range s.clientInfos {
|
|
||||||
if time.Since(cli.lastActiveTime) > s.ttl {
|
|
||||||
cli.logDeregister()
|
|
||||||
delete(s.clientInfos, candidate)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *connectionManager) list() []*commonpb.ClientInfo {
|
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
defer s.mu.RUnlock()
|
defer s.mu.RUnlock()
|
||||||
|
|
||||||
|
@ -151,6 +128,44 @@ func (s *connectionManager) list() []*commonpb.ClientInfo {
|
||||||
return clients
|
return clients
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *connectionManager) Get(ctx context.Context) *commonpb.ClientInfo {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
identifier, err := GetIdentifierFromContext(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
cli, ok := s.clientInfos[identifier]
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return cli.ClientInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *connectionManager) Update(identifier int64) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
cli, ok := s.clientInfos[identifier]
|
||||||
|
if ok {
|
||||||
|
cli.lastActiveTime = time.Now()
|
||||||
|
s.clientInfos[identifier] = cli
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *connectionManager) removeLongInactiveClients() {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
for candidate, cli := range s.clientInfos {
|
||||||
|
if time.Since(cli.lastActiveTime) > s.ttl {
|
||||||
|
log.Info("client deregister", cli.GetLogger()...)
|
||||||
|
delete(s.clientInfos, candidate)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func newConnectionManager(opts ...connectionManagerOption) *connectionManager {
|
func newConnectionManager(opts ...connectionManagerOption) *connectionManager {
|
||||||
s := &connectionManager{
|
s := &connectionManager{
|
||||||
closeSignal: make(chan struct{}, 1),
|
closeSignal: make(chan struct{}, 1),
|
||||||
|
@ -164,16 +179,3 @@ func newConnectionManager(opts ...connectionManagerOption) *connectionManager {
|
||||||
|
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
var connectionManagerInstance *connectionManager
|
|
||||||
|
|
||||||
var getConnectionManagerInstanceOnce sync.Once
|
|
||||||
|
|
||||||
func GetConnectionManager() *connectionManager {
|
|
||||||
getConnectionManagerInstanceOnce.Do(func() {
|
|
||||||
connectionManagerInstance = newConnectionManager(
|
|
||||||
withDuration(defaultConnCheckDuration),
|
|
||||||
withTTL(defaultTTLForInactiveConn))
|
|
||||||
})
|
|
||||||
return connectionManagerInstance
|
|
||||||
}
|
|
|
@ -1,4 +1,4 @@
|
||||||
package proxy
|
package connection
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
@ -32,7 +32,7 @@ func Test_connectionManager_apply(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetConnectionManager(t *testing.T) {
|
func TestGetConnectionManager(t *testing.T) {
|
||||||
s := GetConnectionManager()
|
s := GetManager()
|
||||||
assert.Equal(t, defaultConnCheckDuration, s.duration)
|
assert.Equal(t, defaultConnCheckDuration, s.duration)
|
||||||
assert.Equal(t, defaultTTLForInactiveConn, s.ttl)
|
assert.Equal(t, defaultTTLForInactiveConn, s.ttl)
|
||||||
}
|
}
|
||||||
|
@ -42,28 +42,28 @@ func TestConnectionManager(t *testing.T) {
|
||||||
withDuration(time.Millisecond*5),
|
withDuration(time.Millisecond*5),
|
||||||
withTTL(time.Millisecond*100))
|
withTTL(time.Millisecond*100))
|
||||||
|
|
||||||
s.register(context.TODO(), 1, &commonpb.ClientInfo{
|
s.Register(context.TODO(), 1, &commonpb.ClientInfo{
|
||||||
Reserved: map[string]string{"for_test": "for_test"},
|
Reserved: map[string]string{"for_test": "for_test"},
|
||||||
})
|
})
|
||||||
assert.Equal(t, 1, len(s.list()))
|
assert.Equal(t, 1, len(s.List()))
|
||||||
|
|
||||||
// register duplicate.
|
// register duplicate.
|
||||||
s.register(context.TODO(), 1, &commonpb.ClientInfo{})
|
s.Register(context.TODO(), 1, &commonpb.ClientInfo{})
|
||||||
assert.Equal(t, 1, len(s.list()))
|
assert.Equal(t, 1, len(s.List()))
|
||||||
|
|
||||||
s.register(context.TODO(), 2, &commonpb.ClientInfo{})
|
s.Register(context.TODO(), 2, &commonpb.ClientInfo{})
|
||||||
assert.Equal(t, 2, len(s.list()))
|
assert.Equal(t, 2, len(s.List()))
|
||||||
|
|
||||||
s.keepActive(1)
|
s.KeepActive(1)
|
||||||
s.keepActive(2)
|
s.KeepActive(2)
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 5)
|
time.Sleep(time.Millisecond * 5)
|
||||||
assert.Equal(t, 2, len(s.list()))
|
assert.Equal(t, 2, len(s.List()))
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 100)
|
time.Sleep(time.Millisecond * 100)
|
||||||
assert.Equal(t, 0, len(s.list()))
|
assert.Equal(t, 0, len(s.List()))
|
||||||
|
|
||||||
s.stop()
|
s.Stop()
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 5)
|
time.Sleep(time.Millisecond * 5)
|
||||||
}
|
}
|
|
@ -1,18 +1,36 @@
|
||||||
package proxy
|
package connection
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
|
"go.uber.org/zap"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/metadata"
|
"google.golang.org/grpc/metadata"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||||
"github.com/milvus-io/milvus/pkg/util"
|
"github.com/milvus-io/milvus/pkg/util"
|
||||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
func getIdentifierFromContext(ctx context.Context) (int64, error) {
|
func ZapClientInfo(info *commonpb.ClientInfo) []zap.Field {
|
||||||
|
fields := []zap.Field{
|
||||||
|
zap.String("sdk_type", info.GetSdkType()),
|
||||||
|
zap.String("sdk_version", info.GetSdkVersion()),
|
||||||
|
zap.String("local_time", info.GetLocalTime()),
|
||||||
|
zap.String("user", info.GetUser()),
|
||||||
|
zap.String("host", info.GetHost()),
|
||||||
|
}
|
||||||
|
|
||||||
|
for k, v := range info.GetReserved() {
|
||||||
|
fields = append(fields, zap.String(k, v))
|
||||||
|
}
|
||||||
|
|
||||||
|
return fields
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetIdentifierFromContext(ctx context.Context) (int64, error) {
|
||||||
md, ok := metadata.FromIncomingContext(ctx)
|
md, ok := metadata.FromIncomingContext(ctx)
|
||||||
if !ok {
|
if !ok {
|
||||||
return 0, fmt.Errorf("fail to get metadata from the context")
|
return 0, fmt.Errorf("fail to get metadata from the context")
|
||||||
|
@ -33,9 +51,9 @@ func KeepActiveInterceptor(ctx context.Context, req any, info *grpc.UnaryServerI
|
||||||
// On the other hand, too many goroutines will also influence the rpc.
|
// On the other hand, too many goroutines will also influence the rpc.
|
||||||
// Not sure which way is better, since actually we already make the `keepActive` asynchronous.
|
// Not sure which way is better, since actually we already make the `keepActive` asynchronous.
|
||||||
go func() {
|
go func() {
|
||||||
identifier, err := getIdentifierFromContext(ctx)
|
identifier, err := GetIdentifierFromContext(ctx)
|
||||||
if err == nil && funcutil.CheckCtxValid(ctx) {
|
if err == nil && funcutil.CheckCtxValid(ctx) {
|
||||||
GetConnectionManager().keepActive(identifier)
|
GetManager().KeepActive(identifier)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package proxy
|
package connection
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
@ -12,14 +12,14 @@ import (
|
||||||
func Test_getIdentifierFromContext(t *testing.T) {
|
func Test_getIdentifierFromContext(t *testing.T) {
|
||||||
t.Run("metadata not found", func(t *testing.T) {
|
t.Run("metadata not found", func(t *testing.T) {
|
||||||
ctx := context.TODO()
|
ctx := context.TODO()
|
||||||
_, err := getIdentifierFromContext(ctx)
|
_, err := GetIdentifierFromContext(ctx)
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("no identifier", func(t *testing.T) {
|
t.Run("no identifier", func(t *testing.T) {
|
||||||
md := metadata.New(map[string]string{})
|
md := metadata.New(map[string]string{})
|
||||||
ctx := metadata.NewIncomingContext(context.TODO(), md)
|
ctx := metadata.NewIncomingContext(context.TODO(), md)
|
||||||
_, err := getIdentifierFromContext(ctx)
|
_, err := GetIdentifierFromContext(ctx)
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -28,7 +28,7 @@ func Test_getIdentifierFromContext(t *testing.T) {
|
||||||
"identifier": "i-am-not-invalid-identifier",
|
"identifier": "i-am-not-invalid-identifier",
|
||||||
})
|
})
|
||||||
ctx := metadata.NewIncomingContext(context.TODO(), md)
|
ctx := metadata.NewIncomingContext(context.TODO(), md)
|
||||||
_, err := getIdentifierFromContext(ctx)
|
_, err := GetIdentifierFromContext(ctx)
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -37,7 +37,7 @@ func Test_getIdentifierFromContext(t *testing.T) {
|
||||||
"identifier": "20230518",
|
"identifier": "20230518",
|
||||||
})
|
})
|
||||||
ctx := metadata.NewIncomingContext(context.TODO(), md)
|
ctx := metadata.NewIncomingContext(context.TODO(), md)
|
||||||
identifier, err := getIdentifierFromContext(ctx)
|
identifier, err := GetIdentifierFromContext(ctx)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, int64(20230518), identifier)
|
assert.Equal(t, int64(20230518), identifier)
|
||||||
})
|
})
|
|
@ -41,6 +41,7 @@ import (
|
||||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/proxypb"
|
"github.com/milvus-io/milvus/internal/proto/proxypb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||||
|
"github.com/milvus-io/milvus/internal/proxy/connection"
|
||||||
"github.com/milvus-io/milvus/internal/util/importutil"
|
"github.com/milvus-io/milvus/internal/util/importutil"
|
||||||
"github.com/milvus-io/milvus/pkg/common"
|
"github.com/milvus-io/milvus/pkg/common"
|
||||||
"github.com/milvus-io/milvus/pkg/log"
|
"github.com/milvus-io/milvus/pkg/log"
|
||||||
|
@ -4982,7 +4983,7 @@ func (node *Proxy) Connect(ctx context.Context, request *milvuspb.ConnectRequest
|
||||||
}
|
}
|
||||||
|
|
||||||
db := GetCurDBNameFromContextOrDefault(ctx)
|
db := GetCurDBNameFromContextOrDefault(ctx)
|
||||||
logsToBePrinted := append(getLoggerOfClientInfo(request.GetClientInfo()), zap.String("db", db))
|
logsToBePrinted := append(connection.ZapClientInfo(request.GetClientInfo()), zap.String("db", db))
|
||||||
log := log.Ctx(ctx).With(logsToBePrinted...)
|
log := log.Ctx(ctx).With(logsToBePrinted...)
|
||||||
|
|
||||||
log.Info("connect received")
|
log.Info("connect received")
|
||||||
|
@ -5027,7 +5028,7 @@ func (node *Proxy) Connect(ctx context.Context, request *milvuspb.ConnectRequest
|
||||||
Reserved: make(map[string]string),
|
Reserved: make(map[string]string),
|
||||||
}
|
}
|
||||||
|
|
||||||
GetConnectionManager().register(ctx, int64(ts), request.GetClientInfo())
|
connection.GetManager().Register(ctx, int64(ts), request.GetClientInfo())
|
||||||
|
|
||||||
return &milvuspb.ConnectResponse{
|
return &milvuspb.ConnectResponse{
|
||||||
Status: merr.Success(),
|
Status: merr.Success(),
|
||||||
|
@ -5139,8 +5140,7 @@ func (node *Proxy) ListClientInfos(ctx context.Context, req *proxypb.ListClientI
|
||||||
return &proxypb.ListClientInfosResponse{Status: merr.Status(err)}, nil
|
return &proxypb.ListClientInfosResponse{Status: merr.Status(err)}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
clients := GetConnectionManager().list()
|
clients := connection.GetManager().List()
|
||||||
|
|
||||||
return &proxypb.ListClientInfosResponse{
|
return &proxypb.ListClientInfosResponse{
|
||||||
Status: merr.Success(),
|
Status: merr.Success(),
|
||||||
ClientInfos: clients,
|
ClientInfos: clients,
|
||||||
|
|
|
@ -35,6 +35,7 @@ import (
|
||||||
"github.com/milvus-io/milvus/internal/allocator"
|
"github.com/milvus-io/milvus/internal/allocator"
|
||||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||||
"github.com/milvus-io/milvus/internal/proxy/accesslog"
|
"github.com/milvus-io/milvus/internal/proxy/accesslog"
|
||||||
|
"github.com/milvus-io/milvus/internal/proxy/connection"
|
||||||
"github.com/milvus-io/milvus/internal/types"
|
"github.com/milvus-io/milvus/internal/types"
|
||||||
"github.com/milvus-io/milvus/internal/util/dependency"
|
"github.com/milvus-io/milvus/internal/util/dependency"
|
||||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||||
|
@ -476,8 +477,7 @@ func (node *Proxy) Stop() error {
|
||||||
// https://github.com/milvus-io/milvus/issues/12282
|
// https://github.com/milvus-io/milvus/issues/12282
|
||||||
node.UpdateStateCode(commonpb.StateCode_Abnormal)
|
node.UpdateStateCode(commonpb.StateCode_Abnormal)
|
||||||
|
|
||||||
GetConnectionManager().stop()
|
connection.GetManager().Stop()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue