From 94c15a49e98d7126c205006aa97eb7b537a94685 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Fri, 21 Oct 2022 14:41:28 +0800 Subject: [PATCH] Can't drop loaded partition (#19938) Signed-off-by: cai.zhang Signed-off-by: cai.zhang --- internal/proxy/impl.go | 1 + internal/proxy/query_coord_mock_test.go | 6 + internal/proxy/task.go | 24 +++- internal/proxy/task_index.go | 16 +-- internal/proxy/task_test.go | 21 ++- internal/proxy/util.go | 54 ++++++++ internal/proxy/util_test.go | 131 ++++++++++++++++++- tests/python_client/testcases/test_search.py | 2 +- 8 files changed, 226 insertions(+), 29 deletions(-) diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 2f7570d2de..9705613634 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -1191,6 +1191,7 @@ func (node *Proxy) DropPartition(ctx context.Context, request *milvuspb.DropPart Condition: NewTaskCondition(ctx), DropPartitionRequest: request, rootCoord: node.rootCoord, + queryCoord: node.queryCoord, result: nil, } diff --git a/internal/proxy/query_coord_mock_test.go b/internal/proxy/query_coord_mock_test.go index ce3d542a02..10d674b197 100644 --- a/internal/proxy/query_coord_mock_test.go +++ b/internal/proxy/query_coord_mock_test.go @@ -46,6 +46,12 @@ func SetQueryCoordShowCollectionsFunc(f queryCoordShowCollectionsFuncType) Query } } +func SetQueryCoordShowPartitionsFunc(f queryCoordShowPartitionsFuncType) QueryCoordMockOption { + return func(mock *QueryCoordMock) { + mock.showPartitionsFunc = f + } +} + func withValidShardLeaders() QueryCoordMockOption { return func(mock *QueryCoordMock) { mock.validShardLeaders = true diff --git a/internal/proxy/task.go b/internal/proxy/task.go index d8a221b891..759c18dab3 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -800,9 +800,10 @@ func (cpt *createPartitionTask) PostExecute(ctx context.Context) error { type dropPartitionTask struct { Condition *milvuspb.DropPartitionRequest - ctx context.Context - rootCoord types.RootCoord - result *commonpb.Status + ctx context.Context + rootCoord types.RootCoord + queryCoord types.QueryCoord + result *commonpb.Status } func (dpt *dropPartitionTask) TraceCtx() context.Context { @@ -856,6 +857,23 @@ func (dpt *dropPartitionTask) PreExecute(ctx context.Context) error { return err } + collID, _ := globalMetaCache.GetCollectionID(ctx, dpt.GetCollectionName()) + partID, _ := globalMetaCache.GetPartitionID(ctx, dpt.GetCollectionName(), dpt.GetPartitionName()) + + collLoaded, err := isCollectionLoaded(ctx, dpt.queryCoord, []int64{collID}) + if err != nil { + return err + } + if collLoaded { + loaded, err := isPartitionLoaded(ctx, dpt.queryCoord, collID, []int64{partID}) + if err != nil { + return err + } + if loaded { + return errors.New("partition cannot be dropped, partition is loaded, please release it first") + } + } + return nil } diff --git a/internal/proxy/task_index.go b/internal/proxy/task_index.go index 3c3e15f836..261f562358 100644 --- a/internal/proxy/task_index.go +++ b/internal/proxy/task_index.go @@ -30,7 +30,6 @@ import ( "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/indexpb" - "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/commonpbutil" "github.com/milvus-io/milvus/internal/util/funcutil" @@ -499,24 +498,11 @@ func (dit *dropIndexTask) PreExecute(ctx context.Context) error { collID, _ := globalMetaCache.GetCollectionID(ctx, dit.CollectionName) dit.collectionID = collID - // get all loading collections - resp, err := dit.queryCoord.ShowCollections(ctx, &querypb.ShowCollectionsRequest{ - CollectionIDs: nil, - }) + loaded, err := isCollectionLoaded(ctx, dit.queryCoord, []int64{collID}) if err != nil { return err } - if resp.Status.ErrorCode != commonpb.ErrorCode_Success { - return errors.New(resp.Status.Reason) - } - loaded := false - for _, loadedCollID := range resp.GetCollectionIDs() { - if collID == loadedCollID { - loaded = true - break - } - } if loaded { return errors.New("index cannot be dropped, collection is loaded, please release it first") } diff --git a/internal/proxy/task_test.go b/internal/proxy/task_test.go index 2c94e4b0e5..c83f617cea 100644 --- a/internal/proxy/task_test.go +++ b/internal/proxy/task_test.go @@ -27,6 +27,8 @@ import ( "testing" "time" + "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/mocks" @@ -1091,6 +1093,18 @@ func TestDropPartitionTask(t *testing.T) { collectionName := prefix + funcutil.GenRandomStr() partitionName := prefix + funcutil.GenRandomStr() + showPartitionsMock := func(ctx context.Context, request *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) { + return &querypb.ShowPartitionsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + PartitionIDs: []int64{}, + }, nil + } + qc := NewQueryCoordMock(withValidShardLeaders(), SetQueryCoordShowPartitionsFunc(showPartitionsMock)) + qc.updateState(commonpb.StateCode_Healthy) + task := &dropPartitionTask{ Condition: NewTaskCondition(ctx), DropPartitionRequest: &milvuspb.DropPartitionRequest{ @@ -1103,9 +1117,10 @@ func TestDropPartitionTask(t *testing.T) { CollectionName: collectionName, PartitionName: partitionName, }, - ctx: ctx, - rootCoord: rc, - result: nil, + ctx: ctx, + rootCoord: rc, + queryCoord: qc, + result: nil, } task.PreExecute(ctx) diff --git a/internal/proxy/util.go b/internal/proxy/util.go index 715c7ba2b2..e3a13a4d18 100644 --- a/internal/proxy/util.go +++ b/internal/proxy/util.go @@ -24,6 +24,9 @@ import ( "strings" "time" + "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/types" + "go.uber.org/zap" "golang.org/x/crypto/bcrypt" "google.golang.org/grpc/metadata" @@ -832,3 +835,54 @@ func validateIndexName(indexName string) error { } return nil } + +func isCollectionLoaded(ctx context.Context, qc types.QueryCoord, collIDs []int64) (bool, error) { + // get all loading collections + resp, err := qc.ShowCollections(ctx, &querypb.ShowCollectionsRequest{ + CollectionIDs: nil, + }) + if err != nil { + return false, err + } + if resp.Status.ErrorCode != commonpb.ErrorCode_Success { + return false, errors.New(resp.Status.Reason) + } + + loaded := false +LOOP: + for _, loadedCollID := range resp.GetCollectionIDs() { + for _, collID := range collIDs { + if collID == loadedCollID { + loaded = true + break LOOP + } + } + } + return loaded, nil +} + +func isPartitionLoaded(ctx context.Context, qc types.QueryCoord, collIDs int64, partIDs []int64) (bool, error) { + // get all loading collections + resp, err := qc.ShowPartitions(ctx, &querypb.ShowPartitionsRequest{ + CollectionID: collIDs, + PartitionIDs: nil, + }) + if err != nil { + return false, err + } + if resp.Status.ErrorCode != commonpb.ErrorCode_Success { + return false, errors.New(resp.Status.Reason) + } + + loaded := false +LOOP: + for _, loadedPartID := range resp.GetPartitionIDs() { + for _, partID := range partIDs { + if partID == loadedPartID { + loaded = true + break LOOP + } + } + } + return loaded, nil +} diff --git a/internal/proxy/util_test.go b/internal/proxy/util_test.go index 432a78e5f0..8c59f59156 100644 --- a/internal/proxy/util_test.go +++ b/internal/proxy/util_test.go @@ -18,25 +18,25 @@ package proxy import ( "context" + "errors" "fmt" "strconv" "strings" "testing" "time" + "github.com/stretchr/testify/assert" + "google.golang.org/grpc/metadata" + "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/schemapb" - + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/milvus-io/milvus/internal/util" "github.com/milvus-io/milvus/internal/util/crypto" "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/typeutil" - - "github.com/milvus-io/milvus/internal/proto/internalpb" - "github.com/milvus-io/milvus/internal/proto/rootcoordpb" - - "github.com/stretchr/testify/assert" - "google.golang.org/grpc/metadata" ) func TestValidateCollectionName(t *testing.T) { @@ -809,3 +809,120 @@ func TestValidateTravelTimestamp(t *testing.T) { }) } } + +func Test_isCollectionIsLoaded(t *testing.T) { + ctx := context.Background() + t.Run("normal", func(t *testing.T) { + collID := int64(1) + showCollectionMock := func(ctx context.Context, request *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) { + return &querypb.ShowCollectionsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + CollectionIDs: []int64{collID, 10, 100}, + }, nil + } + qc := NewQueryCoordMock(withValidShardLeaders(), SetQueryCoordShowCollectionsFunc(showCollectionMock)) + qc.updateState(commonpb.StateCode_Healthy) + loaded, err := isCollectionLoaded(ctx, qc, []int64{collID}) + assert.NoError(t, err) + assert.True(t, loaded) + }) + + t.Run("error", func(t *testing.T) { + collID := int64(1) + showCollectionMock := func(ctx context.Context, request *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) { + return &querypb.ShowCollectionsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + CollectionIDs: []int64{collID}, + }, errors.New("error") + } + qc := NewQueryCoordMock(withValidShardLeaders(), SetQueryCoordShowCollectionsFunc(showCollectionMock)) + qc.updateState(commonpb.StateCode_Healthy) + loaded, err := isCollectionLoaded(ctx, qc, []int64{collID}) + assert.Error(t, err) + assert.False(t, loaded) + }) + + t.Run("fail", func(t *testing.T) { + collID := int64(1) + showCollectionMock := func(ctx context.Context, request *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) { + return &querypb.ShowCollectionsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: "fail reason", + }, + CollectionIDs: []int64{collID}, + }, nil + } + qc := NewQueryCoordMock(withValidShardLeaders(), SetQueryCoordShowCollectionsFunc(showCollectionMock)) + qc.updateState(commonpb.StateCode_Healthy) + loaded, err := isCollectionLoaded(ctx, qc, []int64{collID}) + assert.Error(t, err) + assert.False(t, loaded) + }) +} + +func Test_isPartitionIsLoaded(t *testing.T) { + ctx := context.Background() + t.Run("normal", func(t *testing.T) { + collID := int64(1) + partID := int64(2) + showPartitionsMock := func(ctx context.Context, request *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) { + return &querypb.ShowPartitionsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + PartitionIDs: []int64{partID}, + }, nil + } + qc := NewQueryCoordMock(withValidShardLeaders(), SetQueryCoordShowPartitionsFunc(showPartitionsMock)) + qc.updateState(commonpb.StateCode_Healthy) + loaded, err := isPartitionLoaded(ctx, qc, collID, []int64{partID}) + assert.NoError(t, err) + assert.True(t, loaded) + }) + + t.Run("error", func(t *testing.T) { + collID := int64(1) + partID := int64(2) + showPartitionsMock := func(ctx context.Context, request *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) { + return &querypb.ShowPartitionsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + PartitionIDs: []int64{partID}, + }, errors.New("error") + } + qc := NewQueryCoordMock(withValidShardLeaders(), SetQueryCoordShowPartitionsFunc(showPartitionsMock)) + qc.updateState(commonpb.StateCode_Healthy) + loaded, err := isPartitionLoaded(ctx, qc, collID, []int64{partID}) + assert.Error(t, err) + assert.False(t, loaded) + }) + + t.Run("fail", func(t *testing.T) { + collID := int64(1) + partID := int64(2) + showPartitionsMock := func(ctx context.Context, request *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) { + return &querypb.ShowPartitionsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: "fail reason", + }, + PartitionIDs: []int64{partID}, + }, nil + } + qc := NewQueryCoordMock(withValidShardLeaders(), SetQueryCoordShowPartitionsFunc(showPartitionsMock)) + qc.updateState(commonpb.StateCode_Healthy) + loaded, err := isPartitionLoaded(ctx, qc, collID, []int64{partID}) + assert.Error(t, err) + assert.False(t, loaded) + }) +} diff --git a/tests/python_client/testcases/test_search.py b/tests/python_client/testcases/test_search.py index aa47d5e72a..474a7a32b3 100644 --- a/tests/python_client/testcases/test_search.py +++ b/tests/python_client/testcases/test_search.py @@ -664,7 +664,7 @@ class TestCollectionSearchInvalid(TestcaseBase): """ # 1. initialize with data partition_num = 1 - collection_w = self.init_collection_general(prefix, True, 1000, partition_num)[0] + collection_w = self.init_collection_general(prefix, True, 1000, partition_num, is_index=True)[0] # 2. delete partitions log.info("test_search_partition_deleted: deleting a partition") par = collection_w.partitions