diff --git a/go.mod b/go.mod index 96c2bf491a..57057d545f 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module github.com/milvus-io/milvus go 1.18 require ( - github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/aliyun/credentials-go v1.2.7 github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210826220005-b48c857c3a0e github.com/antonmedv/expr v1.8.9 @@ -22,7 +21,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/klauspost/compress v1.16.5 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d - github.com/milvus-io/milvus-proto/go-api/v2 v2.3.0 + github.com/milvus-io/milvus-proto/go-api/v2 v2.3.1-0.20230905091144-d8ce91954095 github.com/milvus-io/milvus/pkg v0.0.1 github.com/minio/minio-go/v7 v7.0.56 github.com/prometheus/client_golang v1.14.0 @@ -53,7 +52,6 @@ require ( golang.org/x/text v0.9.0 google.golang.org/grpc v1.54.0 google.golang.org/grpc/examples v0.0.0-20220617181431-3e7b97febc7f - gorm.io/gorm v1.23.8 stathat.com/c/consistent v1.0.0 ) @@ -118,8 +116,6 @@ require ( github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/ianlancetaylor/cgosymbolizer v0.0.0-20221217025313-27d3c9f66b6a // indirect - github.com/jinzhu/inflection v1.0.0 // indirect - github.com/jinzhu/now v1.1.5 // indirect github.com/jonboulle/clockwork v0.2.2 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/asmfmt v1.3.1 // indirect diff --git a/go.sum b/go.sum index 3ed9362bb2..b77a3c341c 100644 --- a/go.sum +++ b/go.sum @@ -56,8 +56,6 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym github.com/CloudyKit/fastprinter v0.0.0-20200109182630-33d98a066a53/go.mod h1:+3IMCy2vIlbG1XG/0ggNQv0SvxCAIpPM5b1nCz56Xno= github.com/CloudyKit/jet/v3 v3.0.0/go.mod h1:HKQPgSJmdK8hdoAbKUUWajkHyHo4RaU5rMdUywE7VMo= github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= -github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60= -github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= github.com/DataDog/zstd v1.5.0 h1:+K/VEwIAaPcHiMtQvpLD4lqW7f0Gk3xdYZmI1hD+CXo= github.com/DataDog/zstd v1.5.0/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU= @@ -468,11 +466,6 @@ github.com/jhump/gopoet v0.1.0/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+ github.com/jhump/goprotoc v0.5.0/go.mod h1:VrbvcYrQOrTi3i0Vf+m+oqQWk9l72mjkJCYo7UvLHRQ= github.com/jhump/protoreflect v1.11.0/go.mod h1:U7aMIjN0NWq9swDP7xDdoMfRHb35uiuTd3Z9nFXJf5E= github.com/jhump/protoreflect v1.12.0/go.mod h1:JytZfP5d0r8pVNLZvai7U/MCuTWITgrI4tTg7puQFKI= -github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= -github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= -github.com/jinzhu/now v1.1.4/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= -github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= -github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/jonboulle/clockwork v0.2.2 h1:UOGuzwb1PwsrDAObMuhUnj0p5ULPj8V/xJ7Kx9qUBdQ= @@ -575,8 +568,8 @@ github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/le github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.0 h1:t5CKm7+FXuD2rDLv/H8tpN9iY8F2dZvHF87xWBx8muU= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.0/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= +github.com/milvus-io/milvus-proto/go-api/v2 v2.3.1-0.20230905091144-d8ce91954095 h1:PyN9bVl/joOroIZKizJlDaI4wa9Zd84P6nNDm/wYUgY= +github.com/milvus-io/milvus-proto/go-api/v2 v2.3.1-0.20230905091144-d8ce91954095/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A= github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= @@ -1447,8 +1440,6 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gorm.io/gorm v1.23.8 h1:h8sGJ+biDgBA1AD1Ha9gFCx7h8npU7AsLdlkX0n2TpE= -gorm.io/gorm v1.23.8/go.mod h1:l2lP/RyAtc1ynaTjFksBde/O8v9oOGIApu2/xRitmZk= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index b730c5b8e8..1696b99aa4 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -3557,6 +3557,82 @@ func TestGetFlushAllState(t *testing.T) { } } +func TestGetFlushAllStateWithDB(t *testing.T) { + tests := []struct { + testName string + FlushAllTs Timestamp + DbExist bool + ExpectedSuccess bool + ExpectedFlushed bool + }{ + {"test FlushAllWithDB, db exist", 99, true, true, true}, + {"test FlushAllWithDB, db not exist", 99, false, false, false}, + } + for _, test := range tests { + t.Run(test.testName, func(t *testing.T) { + collectionID := UniqueID(0) + dbName := "db" + collectionName := "collection" + vchannels := []string{"mock-vchannel-0", "mock-vchannel-1"} + + svr := &Server{} + svr.stateCode.Store(commonpb.StateCode_Healthy) + var err error + svr.meta = &meta{} + svr.rootCoordClient = mocks.NewRootCoord(t) + svr.broker = NewCoordinatorBroker(svr.rootCoordClient) + + if test.DbExist { + svr.rootCoordClient.(*mocks.RootCoord).EXPECT().ListDatabases(mock.Anything, mock.Anything). + Return(&milvuspb.ListDatabasesResponse{ + DbNames: []string{dbName}, + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + }, nil).Maybe() + } else { + svr.rootCoordClient.(*mocks.RootCoord).EXPECT().ListDatabases(mock.Anything, mock.Anything). + Return(&milvuspb.ListDatabasesResponse{ + DbNames: []string{}, + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + }, nil).Maybe() + } + + svr.rootCoordClient.(*mocks.RootCoord).EXPECT().ShowCollections(mock.Anything, mock.Anything). + Return(&milvuspb.ShowCollectionsResponse{ + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + CollectionIds: []int64{collectionID}, + }, nil).Maybe() + + svr.rootCoordClient.(*mocks.RootCoord).EXPECT().DescribeCollectionInternal(mock.Anything, mock.Anything). + Return(&milvuspb.DescribeCollectionResponse{ + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + VirtualChannelNames: vchannels, + CollectionID: collectionID, + CollectionName: collectionName, + }, nil).Maybe() + + svr.meta.channelCPs = make(map[string]*msgpb.MsgPosition) + channelCPs := []Timestamp{100, 200} + for i, ts := range channelCPs { + channel := vchannels[i] + svr.meta.channelCPs[channel] = &msgpb.MsgPosition{ + ChannelName: channel, + Timestamp: ts, + } + } + + var resp *milvuspb.GetFlushAllStateResponse + resp, err = svr.GetFlushAllState(context.TODO(), &milvuspb.GetFlushAllStateRequest{FlushAllTs: test.FlushAllTs, DbName: dbName}) + assert.NoError(t, err) + if test.ExpectedSuccess { + assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) + } else { + assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) + } + assert.Equal(t, test.ExpectedFlushed, resp.GetFlushed()) + }) + } +} + func TestDataCoordServer_SetSegmentState(t *testing.T) { t.Run("normal case", func(t *testing.T) { svr := newTestServer(t, nil) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index b8692f8bd6..4a66de8b61 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -1294,6 +1294,16 @@ func (s *Server) GetFlushAllState(ctx context.Context, req *milvuspb.GetFlushAll resp.Status.Reason = err.Error() return resp, nil } + dbNames := dbsRsp.DbNames + if req.GetDbName() != "" { + dbNames = lo.Filter(dbNames, func(dbName string, _ int) bool { + return dbName == req.GetDbName() + }) + if len(dbNames) == 0 { + resp.Status.Reason = merr.WrapErrDatabaseNotFound(req.GetDbName()).Error() + return resp, nil + } + } for _, dbName := range dbsRsp.DbNames { showColRsp, err := s.broker.ShowCollections(ctx, dbName) diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index da71472c85..cf53c92569 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/errors" "github.com/golang/protobuf/proto" + "github.com/samber/lo" "go.opentelemetry.io/otel" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -2952,9 +2953,10 @@ func (node *Proxy) CalcDistance(ctx context.Context, request *milvuspb.CalcDista } // FlushAll notifies Proxy to flush all collection's DML messages. -func (node *Proxy) FlushAll(ctx context.Context, _ *milvuspb.FlushAllRequest) (*milvuspb.FlushAllResponse, error) { +func (node *Proxy) FlushAll(ctx context.Context, req *milvuspb.FlushAllRequest) (*milvuspb.FlushAllResponse, error) { ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-FlushAll") defer sp.End() + log := log.With(zap.String("db", req.GetDbName())) resp := &milvuspb.FlushAllResponse{ Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, @@ -2985,8 +2987,18 @@ func (node *Proxy) FlushAll(ctx context.Context, _ *milvuspb.FlushAllRequest) (* if hasError(dbsRsp.GetStatus(), err) { return resp, nil } + dbNames := dbsRsp.DbNames + if req.GetDbName() != "" { + dbNames = lo.Filter(dbNames, func(dbName string, _ int) bool { + return dbName == req.GetDbName() + }) + if len(dbNames) == 0 { + resp.Status.Reason = fmt.Sprintf("failed to get db %s", req.GetDbName()) + return resp, nil + } + } - for _, dbName := range dbsRsp.DbNames { + for _, dbName := range dbNames { // Flush all collections to accelerate the flushAll progress showColRsp, err := node.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{ Base: commonpbutil.NewMsgBase(commonpbutil.WithMsgType(commonpb.MsgType_ShowCollections)), @@ -3635,7 +3647,8 @@ func (node *Proxy) GetFlushAllState(ctx context.Context, req *milvuspb.GetFlushA ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetFlushAllState") defer sp.End() log := log.Ctx(ctx).With(zap.Uint64("FlushAllTs", req.GetFlushAllTs()), - zap.Time("FlushAllTime", tsoutil.PhysicalTime(req.GetFlushAllTs()))) + zap.Time("FlushAllTime", tsoutil.PhysicalTime(req.GetFlushAllTs())), + zap.String("db", req.GetDbName())) log.Debug("receive GetFlushAllState request") var err error diff --git a/internal/proxy/impl_test.go b/internal/proxy/impl_test.go index cb44832b53..ca02c7dc91 100644 --- a/internal/proxy/impl_test.go +++ b/internal/proxy/impl_test.go @@ -363,6 +363,72 @@ func TestProxy_InvalidResourceGroupName(t *testing.T) { }) } +func TestProxy_FlushAll_DbCollection(t *testing.T) { + tests := []struct { + testName string + FlushRequest *milvuspb.FlushAllRequest + ExpectedSuccess bool + }{ + {"flushAll", &milvuspb.FlushAllRequest{}, true}, + {"flushAll set db", &milvuspb.FlushAllRequest{DbName: "default"}, true}, + {"flushAll set db, db not exist", &milvuspb.FlushAllRequest{DbName: "default2"}, false}, + } + for _, test := range tests { + factory := dependency.NewDefaultFactory(true) + ctx := context.Background() + paramtable.Init() + + node, err := NewProxy(ctx, factory) + assert.NoError(t, err) + node.stateCode.Store(commonpb.StateCode_Healthy) + node.tsoAllocator = ×tampAllocator{ + tso: newMockTimestampAllocatorInterface(), + } + + Params.Save(Params.ProxyCfg.MaxTaskNum.Key, "1000") + node.sched, err = newTaskScheduler(ctx, node.tsoAllocator, node.factory) + assert.NoError(t, err) + err = node.sched.Start() + assert.NoError(t, err) + defer node.sched.Close() + node.dataCoord = mocks.NewMockDataCoord(t) + node.rootCoord = mocks.NewRootCoord(t) + successStatus := &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success} + + // set expectations + cache := NewMockCache(t) + cache.On("GetCollectionID", + mock.Anything, // context.Context + mock.AnythingOfType("string"), + mock.AnythingOfType("string"), + ).Return(UniqueID(0), nil).Maybe() + + cache.On("RemoveDatabase", + mock.Anything, // context.Context + mock.AnythingOfType("string"), + ).Maybe() + + globalMetaCache = cache + + node.dataCoord.(*mocks.MockDataCoord).EXPECT().Flush(mock.Anything, mock.Anything). + Return(&datapb.FlushResponse{Status: successStatus}, nil).Maybe() + node.rootCoord.(*mocks.RootCoord).EXPECT().ShowCollections(mock.Anything, mock.Anything). + Return(&milvuspb.ShowCollectionsResponse{Status: successStatus, CollectionNames: []string{"col-0"}}, nil).Maybe() + node.rootCoord.(*mocks.RootCoord).EXPECT().ListDatabases(mock.Anything, mock.Anything). + Return(&milvuspb.ListDatabasesResponse{Status: successStatus, DbNames: []string{"default"}}, nil).Maybe() + + t.Run(test.testName, func(t *testing.T) { + resp, err := node.FlushAll(ctx, test.FlushRequest) + assert.NoError(t, err) + if test.ExpectedSuccess { + assert.Equal(t, resp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) + } else { + assert.NotEqual(t, resp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) + } + }) + } +} + func TestProxy_FlushAll(t *testing.T) { factory := dependency.NewDefaultFactory(true) ctx := context.Background()