diff --git a/internal/rootcoord/create_collection_task.go b/internal/rootcoord/create_collection_task.go index d2a344b012..3609d03313 100644 --- a/internal/rootcoord/create_collection_task.go +++ b/internal/rootcoord/create_collection_task.go @@ -598,6 +598,14 @@ func (t *createCollectionTask) Execute(ctx context.Context) error { UpdateTimestamp: ts, } + // Check if the collection name duplicates an alias. + _, err = t.core.meta.DescribeAlias(ctx, t.Req.GetDbName(), t.Req.GetCollectionName(), typeutil.MaxTimestamp) + if err == nil { + err2 := fmt.Errorf("collection name [%s] conflicts with an existing alias, please choose a unique name", t.Req.GetCollectionName()) + log.Ctx(ctx).Warn("create collection failed", zap.String("database", t.Req.GetDbName()), zap.Error(err2)) + return err2 + } + // We cannot check the idempotency inside meta table when adding collection, since we'll execute duplicate steps // if add collection successfully due to idempotency check. Some steps may be risky to be duplicate executed if they // are not promised idempotent. @@ -613,6 +621,7 @@ func (t *createCollectionTask) Execute(ctx context.Context) error { log.Ctx(ctx).Warn("add duplicate collection", zap.String("collection", t.Req.GetCollectionName()), zap.Uint64("ts", ts)) return nil } + log.Ctx(ctx).Info("check collection existence", zap.String("collection", t.Req.GetCollectionName()), zap.Error(err)) // TODO: The create collection is not idempotent for other component, such as wal. // we need to make the create collection operation must success after some persistent operation, refactor it in future. diff --git a/internal/rootcoord/create_collection_task_test.go b/internal/rootcoord/create_collection_task_test.go index b47a1070b4..1376b1e58d 100644 --- a/internal/rootcoord/create_collection_task_test.go +++ b/internal/rootcoord/create_collection_task_test.go @@ -20,6 +20,7 @@ import ( "context" "math" "strconv" + "strings" "testing" "time" @@ -903,6 +904,8 @@ func Test_createCollectionTask_Execute(t *testing.T) { mock.Anything, mock.Anything, ).Return(coll, nil) + meta.EXPECT().DescribeAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return("", merr.WrapErrAliasNotFound("", "")) core := newTestCore(withMeta(meta), withTtSynchronizer(ticker)) @@ -950,6 +953,8 @@ func Test_createCollectionTask_Execute(t *testing.T) { mock.Anything, mock.Anything, ).Return(coll, nil) + meta.EXPECT().DescribeAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return("", merr.WrapErrAliasNotFound("", "")) core := newTestCore(withMeta(meta), withTtSynchronizer(ticker)) @@ -972,10 +977,11 @@ func Test_createCollectionTask_Execute(t *testing.T) { ticker := newTickerWithMockFailStream() shardNum := 2 pchans := ticker.getDmlChannelNames(shardNum) - meta := newMockMetaTable() - meta.GetCollectionByNameFunc = func(ctx context.Context, collectionName string, ts Timestamp) (*model.Collection, error) { - return nil, errors.New("error mock GetCollectionByName") - } + meta := mockrootcoord.NewIMetaTable(t) + meta.EXPECT().GetCollectionByName(mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(nil, errors.New("error mock GetCollectionByName")) + meta.EXPECT().DescribeAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return("", merr.WrapErrAliasNotFound("", "")) core := newTestCore(withTtSynchronizer(ticker), withMeta(meta)) schema := &schemapb.CollectionSchema{Name: "", Fields: []*schemapb.FieldSchema{{}}} task := &createCollectionTask{ @@ -996,6 +1002,40 @@ func Test_createCollectionTask_Execute(t *testing.T) { assert.Error(t, err) }) + t.Run("collection name duplicates an alias", func(t *testing.T) { + defer cleanTestEnv() + + collectionName := funcutil.GenRandomStr() + ticker := newRocksMqTtSynchronizer() + field1 := funcutil.GenRandomStr() + schema := &schemapb.CollectionSchema{Name: collectionName, Fields: []*schemapb.FieldSchema{{Name: field1}}} + + meta := mockrootcoord.NewIMetaTable(t) + // mock collection name duplicates an alias + meta.EXPECT().DescribeAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(collectionName, nil) + + core := newTestCore(withMeta(meta), withTtSynchronizer(ticker)) + task := &createCollectionTask{ + baseTask: newBaseTask(context.Background(), core), + Req: &milvuspb.CreateCollectionRequest{ + Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateCollection}, + DbName: "mock-db", + CollectionName: collectionName, + Properties: []*commonpb.KeyValuePair{ + { + Key: common.ConsistencyLevel, + Value: "1", + }, + }, + }, + schema: schema, + } + err := task.Execute(context.Background()) + assert.Error(t, err) + assert.True(t, strings.Contains(err.Error(), "conflicts with an existing alias")) + }) + t.Run("normal case", func(t *testing.T) { defer cleanTestEnv() @@ -1023,6 +1063,8 @@ func Test_createCollectionTask_Execute(t *testing.T) { mock.Anything, mock.Anything, ).Return(nil) + meta.EXPECT().DescribeAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return("", merr.WrapErrAliasNotFound("", "")) dc := newMockDataCoord() dc.GetComponentStatesFunc = func(ctx context.Context) (*milvuspb.ComponentStates, error) { @@ -1107,6 +1149,8 @@ func Test_createCollectionTask_Execute(t *testing.T) { mock.Anything, mock.Anything, ).Return(errors.New("error mock ChangeCollectionState")) + meta.EXPECT().DescribeAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return("", merr.WrapErrAliasNotFound("", "")) removeCollectionCalled := false removeCollectionChan := make(chan struct{}, 1) diff --git a/internal/rootcoord/drop_collection_task.go b/internal/rootcoord/drop_collection_task.go index b435ad3d87..bff054cbc6 100644 --- a/internal/rootcoord/drop_collection_task.go +++ b/internal/rootcoord/drop_collection_task.go @@ -63,7 +63,6 @@ func (t *dropCollectionTask) Execute(ctx context.Context) error { log.Ctx(ctx).Warn("drop non-existent collection", zap.String("collection", t.Req.GetCollectionName()), zap.String("database", t.Req.GetDbName())) return nil } - if err != nil { return err } @@ -71,6 +70,13 @@ func (t *dropCollectionTask) Execute(ctx context.Context) error { // meta cache of all aliases should also be cleaned. aliases := t.core.meta.ListAliasesByID(ctx, collMeta.CollectionID) + // Check if all aliases have been dropped. + if len(aliases) > 0 { + err = fmt.Errorf("unable to drop the collection [%s] because it has associated aliases %v, please remove all aliases before dropping the collection", t.Req.GetCollectionName(), aliases) + log.Ctx(ctx).Warn("drop collection failed", zap.String("database", t.Req.GetDbName()), zap.Error(err)) + return err + } + ts := t.GetTs() return executeDropCollectionTaskSteps(ctx, t.core, collMeta, t.Req.GetDbName(), aliases, diff --git a/internal/rootcoord/drop_collection_task_test.go b/internal/rootcoord/drop_collection_task_test.go index f259aed26a..c218d51bd7 100644 --- a/internal/rootcoord/drop_collection_task_test.go +++ b/internal/rootcoord/drop_collection_task_test.go @@ -18,6 +18,7 @@ package rootcoord import ( "context" + "strings" "testing" "time" @@ -181,6 +182,40 @@ func Test_dropCollectionTask_Execute(t *testing.T) { assert.Error(t, err) }) + t.Run("aliases have not been dropped", func(t *testing.T) { + defer cleanTestEnv() + + collectionName := funcutil.GenRandomStr() + shardNum := 2 + + ticker := newRocksMqTtSynchronizer() + pchans := ticker.getDmlChannelNames(shardNum) + ticker.addDmlChannels(pchans...) + + coll := &model.Collection{Name: collectionName, ShardsNum: int32(shardNum), PhysicalChannelNames: pchans} + + meta := mockrootcoord.NewIMetaTable(t) + meta.EXPECT().GetCollectionByName(mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(coll.Clone(), nil) + meta.EXPECT().ListAliasesByID(mock.Anything, mock.Anything). + Return([]string{"mock-alias-0", "mock-alias-1"}) + + core := newTestCore( + withMeta(meta), + withTtSynchronizer(ticker)) + + task := &dropCollectionTask{ + baseTask: newBaseTask(context.Background(), core), + Req: &milvuspb.DropCollectionRequest{ + Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropCollection}, + CollectionName: collectionName, + }, + } + err := task.Execute(context.Background()) + assert.Error(t, err) + assert.True(t, strings.Contains(err.Error(), "please remove all aliases")) + }) + t.Run("normal case, redo", func(t *testing.T) { defer cleanTestEnv() diff --git a/tests/python_client/milvus_client/test_milvus_client_alias.py b/tests/python_client/milvus_client/test_milvus_client_alias.py index 2442bef228..6fce497f8c 100644 --- a/tests/python_client/milvus_client/test_milvus_client_alias.py +++ b/tests/python_client/milvus_client/test_milvus_client_alias.py @@ -179,6 +179,7 @@ class TestMilvusClientAliasInvalid(TestMilvusClientV2Base): f"[alias={alias}]"} self.create_alias(client, collection_name_1, alias, check_task=CheckTasks.err_res, check_items=error) + self.drop_alias(client, alias) self.drop_collection(client, collection_name) @pytest.mark.tags(CaseLabel.L1) @@ -349,6 +350,7 @@ class TestMilvusClientAliasInvalid(TestMilvusClientV2Base): error = {ct.err_code: 1600, ct.err_msg: f"alias not found[database=default][alias={another_alias}]"} self.alter_alias(client, collection_name, another_alias, check_task=CheckTasks.err_res, check_items=error) + self.drop_alias(client, alias) self.drop_collection(client, collection_name) @@ -477,4 +479,6 @@ class TestMilvusClientAliasValid(TestMilvusClientV2Base): # 4. assert collection is equal to alias according to partitions partition_name_list_alias = self.list_partitions(client, another_alias)[0] assert partition_name_list == partition_name_list_alias + self.drop_alias(client, alias) + self.drop_alias(client, another_alias) self.drop_collection(client, collection_name) diff --git a/tests/python_client/milvus_client/test_milvus_client_search_iterator.py b/tests/python_client/milvus_client/test_milvus_client_search_iterator.py index eed3fb38e3..c2425c0875 100644 --- a/tests/python_client/milvus_client/test_milvus_client_search_iterator.py +++ b/tests/python_client/milvus_client/test_milvus_client_search_iterator.py @@ -122,6 +122,7 @@ class TestMilvusClientSearchIteratorInValid(TestMilvusClientV2Base): self.release_collection(client, collection_name) self.drop_collection(client, collection_name) self.release_collection(client, collection_name_new) + self.drop_alias(client, alias) self.drop_collection(client, collection_name_new) diff --git a/tests/python_client/testcases/test_alias.py b/tests/python_client/testcases/test_alias.py index b06d165f2f..cf7d45c189 100644 --- a/tests/python_client/testcases/test_alias.py +++ b/tests/python_client/testcases/test_alias.py @@ -222,6 +222,7 @@ class TestAliasOperation(TestcaseBase): self.utility_wrap.drop_collection(alias_name, check_task=CheckTasks.err_res, check_items=error) + self.utility_wrap.drop_alias(alias_name) self.utility_wrap.drop_collection(c_name) assert not self.utility_wrap.has_collection(c_name)[0] @@ -447,6 +448,7 @@ class TestAliasOperationInvalid(TestcaseBase): assert len(res) == 1 # dropping collection that has an alias shall drop the alias as well + self.utility_wrap.drop_alias(alias_name) collection_w.drop() collection_w = self.init_collection_wrap(name=c_name, schema=default_schema, check_task=CheckTasks.check_collection_property, @@ -454,13 +456,9 @@ class TestAliasOperationInvalid(TestcaseBase): res2 = self.utility_wrap.list_aliases(c_name)[0] assert len(res2) == 0 # the same alias name can be reused for another collection - error = {ct.err_code: 999, - ct.err_msg: f"{alias_name} is alias to another collection: {collection_w.name}: alias already exist"} - self.utility_wrap.create_alias(c_name, alias_name, - check_task=CheckTasks.err_res, - check_items=error) - # res2 = self.utility_wrap.list_aliases(c_name)[0] - # assert len(res2) == 1 + self.utility_wrap.create_alias(c_name, alias_name) + res2 = self.utility_wrap.list_aliases(c_name)[0] + assert len(res2) == 1 @pytest.mark.tags(CaseLabel.L0) def test_alias_rename_collection_to_alias_name(self): @@ -469,7 +467,7 @@ class TestAliasOperationInvalid(TestcaseBase): method: 1.create a collection 2.create an alias for the collection - 3.rename the collection to the alias name no matter the collection was dropped or not + 3.rename the collection to the alias name expected: in step 3, rename collection to alias name failed """ self._connect() @@ -483,10 +481,3 @@ class TestAliasOperationInvalid(TestcaseBase): ct.err_msg: f"cannot rename collection to an existing alias: {alias_name}"} self.utility_wrap.rename_collection(collection_w.name, alias_name, check_task=CheckTasks.err_res, check_items=error) - - collection_w.drop() - collection_w = self.init_collection_wrap(name=c_name, schema=default_schema, - check_task=CheckTasks.check_collection_property, - check_items={exp_name: c_name, exp_schema: default_schema}) - self.utility_wrap.rename_collection(collection_w.name, alias_name, - check_task=CheckTasks.err_res, check_items=error) diff --git a/tests/python_client/testcases/test_query.py b/tests/python_client/testcases/test_query.py index 8c07967aa6..49b9a6cda1 100644 --- a/tests/python_client/testcases/test_query.py +++ b/tests/python_client/testcases/test_query.py @@ -3737,6 +3737,7 @@ class TestQueryCount(TestcaseBase): collection_w_alias.drop(check_task=CheckTasks.err_res, check_items={ct.err_code: 1, ct.err_msg: "cannot drop the collection via alias"}) + self.utility_wrap.drop_alias(alias) collection_w.drop() @pytest.mark.tags(CaseLabel.L2)