diff --git a/configs/milvus.yaml b/configs/milvus.yaml index dbc4673404..6a694a8b92 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -203,20 +203,20 @@ proxy: enable: true # Log filename, set as "" to use stdout. filename: "" - # define formatters for access log by names:{format: XXX, method:[XXX]} + # define formatters for access log by XXX:{format: XXX, method:[XXX,XXX]} formatters: - # "base" formatter could not set method - # all method will use "base" formatter default + # "base" formatter could not set methods + # all method will use "base" formatter default base: # will not print access log if set as "" - format: "[$time_now] [ACCESS] <$user_name: $user_addr> $method_name-$method_status-$error_code [traceID: $trace_id] [timeCost: $time_cost]" + format: "[$time_now] [ACCESS] <$user_name: $user_addr> $method_name [status: $method_status] [code: $error_code] [msg: $error_msg] [traceID: $trace_id] [timeCost: $time_cost]" query: - format: "[$time_now] [ACCESS] <$user_name: $user_addr> $method_status-$method_name [traceID: $trace_id] [timeCost: $time_cost] [database: $database_name] [collection: $collection_name] [partitions: $partition_name] [expr: $method_expr]" + format: "[$time_now] [ACCESS] <$user_name: $user_addr> $method_name [status: $method_status] [code: $error_code] [msg: $error_msg] [traceID: $trace_id] [timeCost: $time_cost] [database: $database_name] [collection: $collection_name] [partitions: $partition_name] [expr: $method_expr]" # set formatter owners by method name(method was all milvus external interface) # all method will use base formatter default # one method only could use one formatter # if set a method formatter mutiple times, will use random fomatter. - methods: ["Query", "Search"] + methods: ["Query", "Search", "Delete"] # localPath: /tmp/milvus_accesslog // log file rootpath # maxSize: 64 # max log file size(MB) of singal log file, mean close when time <= 0. # rotatedTime: 0 # max time range of singal log file, mean close when time <= 0; diff --git a/internal/distributed/proxy/service.go b/internal/distributed/proxy/service.go index 6e79efe429..8f52f39a84 100644 --- a/internal/distributed/proxy/service.go +++ b/internal/distributed/proxy/service.go @@ -59,6 +59,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/proxy" "github.com/milvus-io/milvus/internal/proxy/accesslog" + "github.com/milvus-io/milvus/internal/proxy/connection" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/componentutil" "github.com/milvus-io/milvus/internal/util/dependency" @@ -250,7 +251,7 @@ func (s *Server) startExternalGrpc(grpcPort int, errChan chan error) { logutil.UnaryTraceLoggerInterceptor, proxy.RateLimitInterceptor(limiter), accesslog.UnaryUpdateAccessInfoInterceptor, - proxy.KeepActiveInterceptor, + connection.KeepActiveInterceptor, )) } else { unaryServerOption = grpc.EmptyServerOption{} diff --git a/internal/proxy/accesslog/formatter.go b/internal/proxy/accesslog/formatter.go index 33b9fd3357..fae0ea4278 100644 --- a/internal/proxy/accesslog/formatter.go +++ b/internal/proxy/accesslog/formatter.go @@ -48,6 +48,7 @@ var metricFuncMap = map[string]getMetricFunc{ "$time_start": getTimeStart, "$time_end": getTimeEnd, "$method_expr": getExpr, + "$sdk_version": getSdkVersion, } var BaseFormatterKey = "base" diff --git a/internal/proxy/accesslog/info.go b/internal/proxy/accesslog/info.go index 72cb5b3a56..488347447d 100644 --- a/internal/proxy/accesslog/info.go +++ b/internal/proxy/accesslog/info.go @@ -30,6 +30,7 @@ import ( "google.golang.org/grpc/status" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/proxy/connection" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/requestutil" ) @@ -40,22 +41,22 @@ type AccessInfo interface { type GrpcAccessInfo struct { ctx context.Context - info *grpc.UnaryServerInfo status *commonpb.Status req interface{} resp interface{} err error - start time.Time - end time.Time + grpcInfo *grpc.UnaryServerInfo + start time.Time + end time.Time } -func NewGrpcAccessInfo(ctx context.Context, info *grpc.UnaryServerInfo, req interface{}) *GrpcAccessInfo { +func NewGrpcAccessInfo(ctx context.Context, grpcInfo *grpc.UnaryServerInfo, req interface{}) *GrpcAccessInfo { accessInfo := &GrpcAccessInfo{ - ctx: ctx, - info: info, - req: req, - start: time.Now(), + ctx: ctx, + grpcInfo: grpcInfo, + req: req, + start: time.Now(), } return accessInfo @@ -137,7 +138,7 @@ func getTimeEnd(i *GrpcAccessInfo) string { } func getMethodName(i *GrpcAccessInfo) string { - _, methodName := path.Split(i.info.FullMethod) + _, methodName := path.Split(i.grpcInfo.FullMethod) return methodName } @@ -260,3 +261,11 @@ func getExpr(i *GrpcAccessInfo) string { } return expr.(string) } + +func getSdkVersion(i *GrpcAccessInfo) string { + clientInfo := connection.GetManager().Get(i.ctx) + if clientInfo == nil { + return unknownString + } + return clientInfo.SdkType + "-" + clientInfo.SdkVersion +} diff --git a/internal/proxy/accesslog/info_test.go b/internal/proxy/accesslog/info_test.go index a284212d88..7c9fbffe61 100644 --- a/internal/proxy/accesslog/info_test.go +++ b/internal/proxy/accesslog/info_test.go @@ -29,7 +29,9 @@ import ( "google.golang.org/grpc/peer" "google.golang.org/grpc/status" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/internal/proxy/connection" "github.com/milvus-io/milvus/pkg/util" "github.com/milvus-io/milvus/pkg/util/crypto" "github.com/milvus-io/milvus/pkg/util/merr" @@ -63,8 +65,8 @@ func (s *GrpcAccessInfoSuite) SetupSuite() { } s.info = &GrpcAccessInfo{ - ctx: ctx, - info: serverinfo, + ctx: ctx, + grpcInfo: serverinfo, } } @@ -108,6 +110,26 @@ func (s *GrpcAccessInfoSuite) TestDbName() { s.Equal("test", result[0]) } +func (s *GrpcAccessInfoSuite) TestSdkInfo() { + ctx := context.Background() + s.info.ctx = ctx + result := s.info.Get("$sdk_version") + s.Equal(unknownString, result[0]) + + identifier := 11111 + md := metadata.MD{util.IdentifierKey: []string{fmt.Sprint(identifier)}} + ctx = metadata.NewIncomingContext(ctx, md) + info := &commonpb.ClientInfo{ + SdkType: "test", + SdkVersion: "1.0", + } + connection.GetManager().Register(ctx, int64(identifier), info) + + s.info.ctx = ctx + result = s.info.Get("$sdk_version") + s.Equal(info.SdkType+"-"+info.SdkVersion, result[0]) +} + func TestGrpcAccssInfo(t *testing.T) { suite.Run(t, new(GrpcAccessInfoSuite)) } diff --git a/internal/proxy/client_info.go b/internal/proxy/client_info.go deleted file mode 100644 index 5b60c92cce..0000000000 --- a/internal/proxy/client_info.go +++ /dev/null @@ -1,50 +0,0 @@ -package proxy - -import ( - "context" - "time" - - "go.uber.org/zap" - - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus/pkg/log" -) - -type clientInfo struct { - *commonpb.ClientInfo - identifier int64 - lastActiveTime time.Time -} - -func getLoggerOfClientInfo(info *commonpb.ClientInfo) []zap.Field { - fields := []zap.Field{ - zap.String("sdk_type", info.GetSdkType()), - zap.String("sdk_version", info.GetSdkVersion()), - zap.String("local_time", info.GetLocalTime()), - zap.String("user", info.GetUser()), - zap.String("host", info.GetHost()), - } - - for k, v := range info.GetReserved() { - fields = append(fields, zap.String(k, v)) - } - - return fields -} - -func (c *clientInfo) getLogger() []zap.Field { - fields := getLoggerOfClientInfo(c.ClientInfo) - fields = append(fields, - zap.Int64("identifier", c.identifier), - zap.Time("last_active_time", c.lastActiveTime), - ) - return fields -} - -func (c *clientInfo) ctxLogRegister(ctx context.Context) { - log.Ctx(ctx).Info("client register", c.getLogger()...) -} - -func (c *clientInfo) logDeregister() { - log.Info("client deregister", c.getLogger()...) -} diff --git a/internal/proxy/connection/client_info.go b/internal/proxy/connection/client_info.go new file mode 100644 index 0000000000..8c4f6a1486 --- /dev/null +++ b/internal/proxy/connection/client_info.go @@ -0,0 +1,24 @@ +package connection + +import ( + "time" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" +) + +type clientInfo struct { + *commonpb.ClientInfo + identifier int64 + lastActiveTime time.Time +} + +func (c *clientInfo) GetLogger() []zap.Field { + fields := ZapClientInfo(c.ClientInfo) + fields = append(fields, + zap.Int64("identifier", c.identifier), + zap.Time("last_active_time", c.lastActiveTime), + ) + return fields +} diff --git a/internal/proxy/connection/global.go b/internal/proxy/connection/global.go new file mode 100644 index 0000000000..bc07d2fc72 --- /dev/null +++ b/internal/proxy/connection/global.go @@ -0,0 +1,16 @@ +package connection + +import "sync" + +var connectionManagerInstance *connectionManager + +var getConnectionManagerInstanceOnce sync.Once + +func GetManager() *connectionManager { + getConnectionManagerInstanceOnce.Do(func() { + connectionManagerInstance = newConnectionManager( + withDuration(defaultConnCheckDuration), + withTTL(defaultTTLForInactiveConn)) + }) + return connectionManagerInstance +} diff --git a/internal/proxy/connection_manager.go b/internal/proxy/connection/manager.go similarity index 80% rename from internal/proxy/connection_manager.go rename to internal/proxy/connection/manager.go index de47517fb1..d298914bd3 100644 --- a/internal/proxy/connection_manager.go +++ b/internal/proxy/connection/manager.go @@ -1,4 +1,4 @@ -package proxy +package connection import ( "context" @@ -61,7 +61,7 @@ func (s *connectionManager) init() { }) } -func (s *connectionManager) stop() { +func (s *connectionManager) Stop() { s.stopOnce.Do(func() { close(s.closeSignal) s.wg.Wait() @@ -80,14 +80,14 @@ func (s *connectionManager) checkLoop() { log.Info("connection manager closed") return case identifier := <-s.buffer: - s.update(identifier) + s.Update(identifier) case <-t.C: s.removeLongInactiveClients() } } } -func (s *connectionManager) register(ctx context.Context, identifier int64, info *commonpb.ClientInfo) { +func (s *connectionManager) Register(ctx context.Context, identifier int64, info *commonpb.ClientInfo) { cli := clientInfo{ ClientInfo: info, identifier: identifier, @@ -98,38 +98,15 @@ func (s *connectionManager) register(ctx context.Context, identifier int64, info defer s.mu.Unlock() s.clientInfos[identifier] = cli - cli.ctxLogRegister(ctx) + log.Ctx(ctx).Info("client register", cli.GetLogger()...) } -func (s *connectionManager) keepActive(identifier int64) { +func (s *connectionManager) KeepActive(identifier int64) { // make this asynchronous and then the rpc won't be blocked too long. s.buffer <- identifier } -func (s *connectionManager) update(identifier int64) { - s.mu.Lock() - defer s.mu.Unlock() - - cli, ok := s.clientInfos[identifier] - if ok { - cli.lastActiveTime = time.Now() - s.clientInfos[identifier] = cli - } -} - -func (s *connectionManager) removeLongInactiveClients() { - s.mu.Lock() - defer s.mu.Unlock() - - for candidate, cli := range s.clientInfos { - if time.Since(cli.lastActiveTime) > s.ttl { - cli.logDeregister() - delete(s.clientInfos, candidate) - } - } -} - -func (s *connectionManager) list() []*commonpb.ClientInfo { +func (s *connectionManager) List() []*commonpb.ClientInfo { s.mu.RLock() defer s.mu.RUnlock() @@ -151,6 +128,44 @@ func (s *connectionManager) list() []*commonpb.ClientInfo { return clients } +func (s *connectionManager) Get(ctx context.Context) *commonpb.ClientInfo { + s.mu.RLock() + defer s.mu.RUnlock() + identifier, err := GetIdentifierFromContext(ctx) + if err != nil { + return nil + } + + cli, ok := s.clientInfos[identifier] + if !ok { + return nil + } + return cli.ClientInfo +} + +func (s *connectionManager) Update(identifier int64) { + s.mu.Lock() + defer s.mu.Unlock() + + cli, ok := s.clientInfos[identifier] + if ok { + cli.lastActiveTime = time.Now() + s.clientInfos[identifier] = cli + } +} + +func (s *connectionManager) removeLongInactiveClients() { + s.mu.Lock() + defer s.mu.Unlock() + + for candidate, cli := range s.clientInfos { + if time.Since(cli.lastActiveTime) > s.ttl { + log.Info("client deregister", cli.GetLogger()...) + delete(s.clientInfos, candidate) + } + } +} + func newConnectionManager(opts ...connectionManagerOption) *connectionManager { s := &connectionManager{ closeSignal: make(chan struct{}, 1), @@ -164,16 +179,3 @@ func newConnectionManager(opts ...connectionManagerOption) *connectionManager { return s } - -var connectionManagerInstance *connectionManager - -var getConnectionManagerInstanceOnce sync.Once - -func GetConnectionManager() *connectionManager { - getConnectionManagerInstanceOnce.Do(func() { - connectionManagerInstance = newConnectionManager( - withDuration(defaultConnCheckDuration), - withTTL(defaultTTLForInactiveConn)) - }) - return connectionManagerInstance -} diff --git a/internal/proxy/connection_manager_test.go b/internal/proxy/connection/manager_test.go similarity index 74% rename from internal/proxy/connection_manager_test.go rename to internal/proxy/connection/manager_test.go index 25f8c98b27..110569ed85 100644 --- a/internal/proxy/connection_manager_test.go +++ b/internal/proxy/connection/manager_test.go @@ -1,4 +1,4 @@ -package proxy +package connection import ( "context" @@ -32,7 +32,7 @@ func Test_connectionManager_apply(t *testing.T) { } func TestGetConnectionManager(t *testing.T) { - s := GetConnectionManager() + s := GetManager() assert.Equal(t, defaultConnCheckDuration, s.duration) assert.Equal(t, defaultTTLForInactiveConn, s.ttl) } @@ -42,28 +42,28 @@ func TestConnectionManager(t *testing.T) { withDuration(time.Millisecond*5), withTTL(time.Millisecond*100)) - s.register(context.TODO(), 1, &commonpb.ClientInfo{ + s.Register(context.TODO(), 1, &commonpb.ClientInfo{ Reserved: map[string]string{"for_test": "for_test"}, }) - assert.Equal(t, 1, len(s.list())) + assert.Equal(t, 1, len(s.List())) // register duplicate. - s.register(context.TODO(), 1, &commonpb.ClientInfo{}) - assert.Equal(t, 1, len(s.list())) + s.Register(context.TODO(), 1, &commonpb.ClientInfo{}) + assert.Equal(t, 1, len(s.List())) - s.register(context.TODO(), 2, &commonpb.ClientInfo{}) - assert.Equal(t, 2, len(s.list())) + s.Register(context.TODO(), 2, &commonpb.ClientInfo{}) + assert.Equal(t, 2, len(s.List())) - s.keepActive(1) - s.keepActive(2) + s.KeepActive(1) + s.KeepActive(2) time.Sleep(time.Millisecond * 5) - assert.Equal(t, 2, len(s.list())) + assert.Equal(t, 2, len(s.List())) time.Sleep(time.Millisecond * 100) - assert.Equal(t, 0, len(s.list())) + assert.Equal(t, 0, len(s.List())) - s.stop() + s.Stop() time.Sleep(time.Millisecond * 5) } diff --git a/internal/proxy/keep_active_interceptor.go b/internal/proxy/connection/util.go similarity index 64% rename from internal/proxy/keep_active_interceptor.go rename to internal/proxy/connection/util.go index 1da72d7072..50d7e5118f 100644 --- a/internal/proxy/keep_active_interceptor.go +++ b/internal/proxy/connection/util.go @@ -1,18 +1,36 @@ -package proxy +package connection import ( "context" "fmt" "strconv" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/metadata" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/pkg/util" "github.com/milvus-io/milvus/pkg/util/funcutil" ) -func getIdentifierFromContext(ctx context.Context) (int64, error) { +func ZapClientInfo(info *commonpb.ClientInfo) []zap.Field { + fields := []zap.Field{ + zap.String("sdk_type", info.GetSdkType()), + zap.String("sdk_version", info.GetSdkVersion()), + zap.String("local_time", info.GetLocalTime()), + zap.String("user", info.GetUser()), + zap.String("host", info.GetHost()), + } + + for k, v := range info.GetReserved() { + fields = append(fields, zap.String(k, v)) + } + + return fields +} + +func GetIdentifierFromContext(ctx context.Context) (int64, error) { md, ok := metadata.FromIncomingContext(ctx) if !ok { return 0, fmt.Errorf("fail to get metadata from the context") @@ -33,9 +51,9 @@ func KeepActiveInterceptor(ctx context.Context, req any, info *grpc.UnaryServerI // On the other hand, too many goroutines will also influence the rpc. // Not sure which way is better, since actually we already make the `keepActive` asynchronous. go func() { - identifier, err := getIdentifierFromContext(ctx) + identifier, err := GetIdentifierFromContext(ctx) if err == nil && funcutil.CheckCtxValid(ctx) { - GetConnectionManager().keepActive(identifier) + GetManager().KeepActive(identifier) } }() diff --git a/internal/proxy/keep_active_interceptor_test.go b/internal/proxy/connection/util_test.go similarity index 88% rename from internal/proxy/keep_active_interceptor_test.go rename to internal/proxy/connection/util_test.go index 3de5e19f0b..66bd7ee298 100644 --- a/internal/proxy/keep_active_interceptor_test.go +++ b/internal/proxy/connection/util_test.go @@ -1,4 +1,4 @@ -package proxy +package connection import ( "context" @@ -12,14 +12,14 @@ import ( func Test_getIdentifierFromContext(t *testing.T) { t.Run("metadata not found", func(t *testing.T) { ctx := context.TODO() - _, err := getIdentifierFromContext(ctx) + _, err := GetIdentifierFromContext(ctx) assert.Error(t, err) }) t.Run("no identifier", func(t *testing.T) { md := metadata.New(map[string]string{}) ctx := metadata.NewIncomingContext(context.TODO(), md) - _, err := getIdentifierFromContext(ctx) + _, err := GetIdentifierFromContext(ctx) assert.Error(t, err) }) @@ -28,7 +28,7 @@ func Test_getIdentifierFromContext(t *testing.T) { "identifier": "i-am-not-invalid-identifier", }) ctx := metadata.NewIncomingContext(context.TODO(), md) - _, err := getIdentifierFromContext(ctx) + _, err := GetIdentifierFromContext(ctx) assert.Error(t, err) }) @@ -37,7 +37,7 @@ func Test_getIdentifierFromContext(t *testing.T) { "identifier": "20230518", }) ctx := metadata.NewIncomingContext(context.TODO(), md) - identifier, err := getIdentifierFromContext(ctx) + identifier, err := GetIdentifierFromContext(ctx) assert.NoError(t, err) assert.Equal(t, int64(20230518), identifier) }) diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 036386a7dc..61c4c20c18 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -41,6 +41,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/proxy/connection" "github.com/milvus-io/milvus/internal/util/importutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" @@ -4982,7 +4983,7 @@ func (node *Proxy) Connect(ctx context.Context, request *milvuspb.ConnectRequest } db := GetCurDBNameFromContextOrDefault(ctx) - logsToBePrinted := append(getLoggerOfClientInfo(request.GetClientInfo()), zap.String("db", db)) + logsToBePrinted := append(connection.ZapClientInfo(request.GetClientInfo()), zap.String("db", db)) log := log.Ctx(ctx).With(logsToBePrinted...) log.Info("connect received") @@ -5027,7 +5028,7 @@ func (node *Proxy) Connect(ctx context.Context, request *milvuspb.ConnectRequest Reserved: make(map[string]string), } - GetConnectionManager().register(ctx, int64(ts), request.GetClientInfo()) + connection.GetManager().Register(ctx, int64(ts), request.GetClientInfo()) return &milvuspb.ConnectResponse{ Status: merr.Success(), @@ -5139,8 +5140,7 @@ func (node *Proxy) ListClientInfos(ctx context.Context, req *proxypb.ListClientI return &proxypb.ListClientInfosResponse{Status: merr.Status(err)}, nil } - clients := GetConnectionManager().list() - + clients := connection.GetManager().List() return &proxypb.ListClientInfosResponse{ Status: merr.Success(), ClientInfos: clients, diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 51426af2d8..7439c27ae0 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -35,6 +35,7 @@ import ( "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proxy/accesslog" + "github.com/milvus-io/milvus/internal/proxy/connection" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/sessionutil" @@ -476,8 +477,7 @@ func (node *Proxy) Stop() error { // https://github.com/milvus-io/milvus/issues/12282 node.UpdateStateCode(commonpb.StateCode_Abnormal) - GetConnectionManager().stop() - + connection.GetManager().Stop() return nil }