Add constraint of releasing partitions before dropping (#23732)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/23790/head
yihao.dai 2023-04-28 11:12:36 +08:00 committed by GitHub
parent 017b15af6a
commit 6f94bfd26d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 57 additions and 0 deletions

View File

@ -37,6 +37,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -866,6 +867,32 @@ func (dpt *dropPartitionTask) PreExecute(ctx context.Context) error {
return err
}
collID, err := globalMetaCache.GetCollectionID(ctx, dpt.GetCollectionName())
if err != nil {
return err
}
partID, err := globalMetaCache.GetPartitionID(ctx, dpt.GetCollectionName(), dpt.GetPartitionName())
if err != nil {
if errors.Is(merr.ErrPartitionNotFound, err) {
return nil
}
return err
}
collLoaded, err := isCollectionLoaded(ctx, dpt.queryCoord, collID)
if err != nil {
return err
}
if collLoaded {
loaded, err := isPartitionLoaded(ctx, dpt.queryCoord, collID, []int64{partID})
if err != nil {
return err
}
if loaded {
return errors.New("partition cannot be dropped, partition is loaded, please release it first")
}
}
return nil
}

View File

@ -1135,6 +1135,20 @@ func TestDropPartitionTask(t *testing.T) {
err = task.PreExecute(ctx)
assert.NotNil(t, err)
t.Run("get collectionID error", func(t *testing.T) {
mockCache := newMockCache()
mockCache.setGetPartitionIDFunc(func(ctx context.Context, collectionName string, partitionName string) (typeutil.UniqueID, error) {
return 1, nil
})
mockCache.setGetIDFunc(func(ctx context.Context, collectionName string) (typeutil.UniqueID, error) {
return 0, errors.New("error")
})
globalMetaCache = mockCache
task.PartitionName = "partition1"
err = task.PreExecute(ctx)
assert.Error(t, err)
})
t.Run("partition not exist", func(t *testing.T) {
task.PartitionName = "partition2"
@ -1149,6 +1163,21 @@ func TestDropPartitionTask(t *testing.T) {
err = task.PreExecute(ctx)
assert.NoError(t, err)
})
t.Run("get partition error", func(t *testing.T) {
task.PartitionName = "partition3"
mockCache := newMockCache()
mockCache.setGetPartitionIDFunc(func(ctx context.Context, collectionName string, partitionName string) (typeutil.UniqueID, error) {
return 0, errors.New("error")
})
mockCache.setGetIDFunc(func(ctx context.Context, collectionName string) (typeutil.UniqueID, error) {
return 1, nil
})
globalMetaCache = mockCache
err = task.PreExecute(ctx)
assert.Error(t, err)
})
}
func TestHasPartitionTask(t *testing.T) {

View File

@ -2049,6 +2049,7 @@ class TestQueryCount(TestcaseBase):
)
# drop p1 partition
p1.release()
p1.drop()
collection_w.query(expr=default_expr, output_fields=[ct.default_count_output],
partition_names=[p1.name],