Enhance FlushAll/GetFlushAllState API (#26802)

Signed-off-by: wayblink <anyang.wang@zilliz.com>
pull/26670/head
wayblink 2023-09-06 10:35:48 +08:00 committed by GitHub
parent 78a2638fd4
commit d7b6f3a9e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 171 additions and 19 deletions

6
go.mod
View File

@ -3,7 +3,6 @@ module github.com/milvus-io/milvus
go 1.18
require (
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/aliyun/credentials-go v1.2.7
github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210826220005-b48c857c3a0e
github.com/antonmedv/expr v1.8.9
@ -22,7 +21,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.16.5
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.0
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.1-0.20230905091144-d8ce91954095
github.com/milvus-io/milvus/pkg v0.0.1
github.com/minio/minio-go/v7 v7.0.56
github.com/prometheus/client_golang v1.14.0
@ -53,7 +52,6 @@ require (
golang.org/x/text v0.9.0
google.golang.org/grpc v1.54.0
google.golang.org/grpc/examples v0.0.0-20220617181431-3e7b97febc7f
gorm.io/gorm v1.23.8
stathat.com/c/consistent v1.0.0
)
@ -118,8 +116,6 @@ require (
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/ianlancetaylor/cgosymbolizer v0.0.0-20221217025313-27d3c9f66b6a // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/jonboulle/clockwork v0.2.2 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/asmfmt v1.3.1 // indirect

13
go.sum
View File

@ -56,8 +56,6 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym
github.com/CloudyKit/fastprinter v0.0.0-20200109182630-33d98a066a53/go.mod h1:+3IMCy2vIlbG1XG/0ggNQv0SvxCAIpPM5b1nCz56Xno=
github.com/CloudyKit/jet/v3 v3.0.0/go.mod h1:HKQPgSJmdK8hdoAbKUUWajkHyHo4RaU5rMdUywE7VMo=
github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/DataDog/zstd v1.5.0 h1:+K/VEwIAaPcHiMtQvpLD4lqW7f0Gk3xdYZmI1hD+CXo=
github.com/DataDog/zstd v1.5.0/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU=
@ -468,11 +466,6 @@ github.com/jhump/gopoet v0.1.0/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+
github.com/jhump/goprotoc v0.5.0/go.mod h1:VrbvcYrQOrTi3i0Vf+m+oqQWk9l72mjkJCYo7UvLHRQ=
github.com/jhump/protoreflect v1.11.0/go.mod h1:U7aMIjN0NWq9swDP7xDdoMfRHb35uiuTd3Z9nFXJf5E=
github.com/jhump/protoreflect v1.12.0/go.mod h1:JytZfP5d0r8pVNLZvai7U/MCuTWITgrI4tTg7puQFKI=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.4/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/jonboulle/clockwork v0.2.2 h1:UOGuzwb1PwsrDAObMuhUnj0p5ULPj8V/xJ7Kx9qUBdQ=
@ -575,8 +568,8 @@ github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/le
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.0 h1:t5CKm7+FXuD2rDLv/H8tpN9iY8F2dZvHF87xWBx8muU=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.0/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.1-0.20230905091144-d8ce91954095 h1:PyN9bVl/joOroIZKizJlDaI4wa9Zd84P6nNDm/wYUgY=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.1-0.20230905091144-d8ce91954095/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=
github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
@ -1447,8 +1440,6 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/gorm v1.23.8 h1:h8sGJ+biDgBA1AD1Ha9gFCx7h8npU7AsLdlkX0n2TpE=
gorm.io/gorm v1.23.8/go.mod h1:l2lP/RyAtc1ynaTjFksBde/O8v9oOGIApu2/xRitmZk=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

View File

@ -3557,6 +3557,82 @@ func TestGetFlushAllState(t *testing.T) {
}
}
func TestGetFlushAllStateWithDB(t *testing.T) {
tests := []struct {
testName string
FlushAllTs Timestamp
DbExist bool
ExpectedSuccess bool
ExpectedFlushed bool
}{
{"test FlushAllWithDB, db exist", 99, true, true, true},
{"test FlushAllWithDB, db not exist", 99, false, false, false},
}
for _, test := range tests {
t.Run(test.testName, func(t *testing.T) {
collectionID := UniqueID(0)
dbName := "db"
collectionName := "collection"
vchannels := []string{"mock-vchannel-0", "mock-vchannel-1"}
svr := &Server{}
svr.stateCode.Store(commonpb.StateCode_Healthy)
var err error
svr.meta = &meta{}
svr.rootCoordClient = mocks.NewRootCoord(t)
svr.broker = NewCoordinatorBroker(svr.rootCoordClient)
if test.DbExist {
svr.rootCoordClient.(*mocks.RootCoord).EXPECT().ListDatabases(mock.Anything, mock.Anything).
Return(&milvuspb.ListDatabasesResponse{
DbNames: []string{dbName},
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
}, nil).Maybe()
} else {
svr.rootCoordClient.(*mocks.RootCoord).EXPECT().ListDatabases(mock.Anything, mock.Anything).
Return(&milvuspb.ListDatabasesResponse{
DbNames: []string{},
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
}, nil).Maybe()
}
svr.rootCoordClient.(*mocks.RootCoord).EXPECT().ShowCollections(mock.Anything, mock.Anything).
Return(&milvuspb.ShowCollectionsResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
CollectionIds: []int64{collectionID},
}, nil).Maybe()
svr.rootCoordClient.(*mocks.RootCoord).EXPECT().DescribeCollectionInternal(mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
VirtualChannelNames: vchannels,
CollectionID: collectionID,
CollectionName: collectionName,
}, nil).Maybe()
svr.meta.channelCPs = make(map[string]*msgpb.MsgPosition)
channelCPs := []Timestamp{100, 200}
for i, ts := range channelCPs {
channel := vchannels[i]
svr.meta.channelCPs[channel] = &msgpb.MsgPosition{
ChannelName: channel,
Timestamp: ts,
}
}
var resp *milvuspb.GetFlushAllStateResponse
resp, err = svr.GetFlushAllState(context.TODO(), &milvuspb.GetFlushAllStateRequest{FlushAllTs: test.FlushAllTs, DbName: dbName})
assert.NoError(t, err)
if test.ExpectedSuccess {
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
} else {
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
}
assert.Equal(t, test.ExpectedFlushed, resp.GetFlushed())
})
}
}
func TestDataCoordServer_SetSegmentState(t *testing.T) {
t.Run("normal case", func(t *testing.T) {
svr := newTestServer(t, nil)

View File

@ -1294,6 +1294,16 @@ func (s *Server) GetFlushAllState(ctx context.Context, req *milvuspb.GetFlushAll
resp.Status.Reason = err.Error()
return resp, nil
}
dbNames := dbsRsp.DbNames
if req.GetDbName() != "" {
dbNames = lo.Filter(dbNames, func(dbName string, _ int) bool {
return dbName == req.GetDbName()
})
if len(dbNames) == 0 {
resp.Status.Reason = merr.WrapErrDatabaseNotFound(req.GetDbName()).Error()
return resp, nil
}
}
for _, dbName := range dbsRsp.DbNames {
showColRsp, err := s.broker.ShowCollections(ctx, dbName)

View File

@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
@ -2952,9 +2953,10 @@ func (node *Proxy) CalcDistance(ctx context.Context, request *milvuspb.CalcDista
}
// FlushAll notifies Proxy to flush all collection's DML messages.
func (node *Proxy) FlushAll(ctx context.Context, _ *milvuspb.FlushAllRequest) (*milvuspb.FlushAllResponse, error) {
func (node *Proxy) FlushAll(ctx context.Context, req *milvuspb.FlushAllRequest) (*milvuspb.FlushAllResponse, error) {
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-FlushAll")
defer sp.End()
log := log.With(zap.String("db", req.GetDbName()))
resp := &milvuspb.FlushAllResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError},
@ -2985,8 +2987,18 @@ func (node *Proxy) FlushAll(ctx context.Context, _ *milvuspb.FlushAllRequest) (*
if hasError(dbsRsp.GetStatus(), err) {
return resp, nil
}
dbNames := dbsRsp.DbNames
if req.GetDbName() != "" {
dbNames = lo.Filter(dbNames, func(dbName string, _ int) bool {
return dbName == req.GetDbName()
})
if len(dbNames) == 0 {
resp.Status.Reason = fmt.Sprintf("failed to get db %s", req.GetDbName())
return resp, nil
}
}
for _, dbName := range dbsRsp.DbNames {
for _, dbName := range dbNames {
// Flush all collections to accelerate the flushAll progress
showColRsp, err := node.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{
Base: commonpbutil.NewMsgBase(commonpbutil.WithMsgType(commonpb.MsgType_ShowCollections)),
@ -3635,7 +3647,8 @@ func (node *Proxy) GetFlushAllState(ctx context.Context, req *milvuspb.GetFlushA
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetFlushAllState")
defer sp.End()
log := log.Ctx(ctx).With(zap.Uint64("FlushAllTs", req.GetFlushAllTs()),
zap.Time("FlushAllTime", tsoutil.PhysicalTime(req.GetFlushAllTs())))
zap.Time("FlushAllTime", tsoutil.PhysicalTime(req.GetFlushAllTs())),
zap.String("db", req.GetDbName()))
log.Debug("receive GetFlushAllState request")
var err error

View File

@ -363,6 +363,72 @@ func TestProxy_InvalidResourceGroupName(t *testing.T) {
})
}
func TestProxy_FlushAll_DbCollection(t *testing.T) {
tests := []struct {
testName string
FlushRequest *milvuspb.FlushAllRequest
ExpectedSuccess bool
}{
{"flushAll", &milvuspb.FlushAllRequest{}, true},
{"flushAll set db", &milvuspb.FlushAllRequest{DbName: "default"}, true},
{"flushAll set db, db not exist", &milvuspb.FlushAllRequest{DbName: "default2"}, false},
}
for _, test := range tests {
factory := dependency.NewDefaultFactory(true)
ctx := context.Background()
paramtable.Init()
node, err := NewProxy(ctx, factory)
assert.NoError(t, err)
node.stateCode.Store(commonpb.StateCode_Healthy)
node.tsoAllocator = &timestampAllocator{
tso: newMockTimestampAllocatorInterface(),
}
Params.Save(Params.ProxyCfg.MaxTaskNum.Key, "1000")
node.sched, err = newTaskScheduler(ctx, node.tsoAllocator, node.factory)
assert.NoError(t, err)
err = node.sched.Start()
assert.NoError(t, err)
defer node.sched.Close()
node.dataCoord = mocks.NewMockDataCoord(t)
node.rootCoord = mocks.NewRootCoord(t)
successStatus := &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}
// set expectations
cache := NewMockCache(t)
cache.On("GetCollectionID",
mock.Anything, // context.Context
mock.AnythingOfType("string"),
mock.AnythingOfType("string"),
).Return(UniqueID(0), nil).Maybe()
cache.On("RemoveDatabase",
mock.Anything, // context.Context
mock.AnythingOfType("string"),
).Maybe()
globalMetaCache = cache
node.dataCoord.(*mocks.MockDataCoord).EXPECT().Flush(mock.Anything, mock.Anything).
Return(&datapb.FlushResponse{Status: successStatus}, nil).Maybe()
node.rootCoord.(*mocks.RootCoord).EXPECT().ShowCollections(mock.Anything, mock.Anything).
Return(&milvuspb.ShowCollectionsResponse{Status: successStatus, CollectionNames: []string{"col-0"}}, nil).Maybe()
node.rootCoord.(*mocks.RootCoord).EXPECT().ListDatabases(mock.Anything, mock.Anything).
Return(&milvuspb.ListDatabasesResponse{Status: successStatus, DbNames: []string{"default"}}, nil).Maybe()
t.Run(test.testName, func(t *testing.T) {
resp, err := node.FlushAll(ctx, test.FlushRequest)
assert.NoError(t, err)
if test.ExpectedSuccess {
assert.Equal(t, resp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
} else {
assert.NotEqual(t, resp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
}
})
}
}
func TestProxy_FlushAll(t *testing.T) {
factory := dependency.NewDefaultFactory(true)
ctx := context.Background()