mirror of https://github.com/milvus-io/milvus.git
Dropping index must happen without collection being loaded (#19886)
Signed-off-by: cai.zhang <cai.zhang@zilliz.com> Signed-off-by: cai.zhang <cai.zhang@zilliz.com>pull/19911/head
parent
0883599af8
commit
836081c8c7
|
@ -2130,6 +2130,7 @@ func (node *Proxy) DropIndex(ctx context.Context, request *milvuspb.DropIndexReq
|
|||
Condition: NewTaskCondition(ctx),
|
||||
DropIndexRequest: request,
|
||||
indexCoord: node.indexCoord,
|
||||
queryCoord: node.queryCoord,
|
||||
}
|
||||
|
||||
method := "DropIndex"
|
||||
|
|
|
@ -30,6 +30,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/common"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/internal/util/commonpbutil"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
|
@ -434,6 +435,7 @@ type dropIndexTask struct {
|
|||
ctx context.Context
|
||||
*milvuspb.DropIndexRequest
|
||||
indexCoord types.IndexCoord
|
||||
queryCoord types.QueryCoord
|
||||
result *commonpb.Status
|
||||
|
||||
collectionID UniqueID
|
||||
|
@ -497,6 +499,28 @@ func (dit *dropIndexTask) PreExecute(ctx context.Context) error {
|
|||
collID, _ := globalMetaCache.GetCollectionID(ctx, dit.CollectionName)
|
||||
dit.collectionID = collID
|
||||
|
||||
// get all loading collections
|
||||
resp, err := dit.queryCoord.ShowCollections(ctx, &querypb.ShowCollectionsRequest{
|
||||
CollectionIDs: nil,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
|
||||
return errors.New(resp.Status.Reason)
|
||||
}
|
||||
|
||||
loaded := false
|
||||
for _, loadedCollID := range resp.GetCollectionIDs() {
|
||||
if collID == loadedCollID {
|
||||
loaded = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if loaded {
|
||||
return errors.New("index cannot be dropped, collection is loaded, please release it first")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -18,12 +18,13 @@ package proxy
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
@ -87,3 +88,111 @@ func TestGetIndexStateTask_Execute(t *testing.T) {
|
|||
assert.NoError(t, gist.Execute(ctx))
|
||||
assert.Equal(t, commonpb.IndexState_Finished, gist.result.GetState())
|
||||
}
|
||||
|
||||
func TestDropIndexTask_PreExecute(t *testing.T) {
|
||||
collectionName := "collection1"
|
||||
collectionID := UniqueID(1)
|
||||
fieldName := "field1"
|
||||
indexName := ""
|
||||
|
||||
Params.Init()
|
||||
rc := newMockRootCoord()
|
||||
showCollectionMock := func(ctx context.Context, request *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
|
||||
return &querypb.ShowCollectionsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
},
|
||||
CollectionIDs: nil,
|
||||
}, nil
|
||||
}
|
||||
qc := NewQueryCoordMock(withValidShardLeaders(), SetQueryCoordShowCollectionsFunc(showCollectionMock))
|
||||
ic := newMockIndexCoord()
|
||||
ctx := context.Background()
|
||||
qc.updateState(commonpb.StateCode_Healthy)
|
||||
|
||||
shardMgr := newShardClientMgr()
|
||||
// failed to get collection id.
|
||||
_ = InitMetaCache(ctx, rc, qc, shardMgr)
|
||||
|
||||
rc.DescribeCollectionFunc = func(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
|
||||
return &milvuspb.DescribeCollectionResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
},
|
||||
Schema: newTestSchema(),
|
||||
CollectionID: collectionID,
|
||||
CollectionName: request.CollectionName,
|
||||
}, nil
|
||||
}
|
||||
|
||||
dit := dropIndexTask{
|
||||
ctx: ctx,
|
||||
DropIndexRequest: &milvuspb.DropIndexRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: 0,
|
||||
MsgID: 0,
|
||||
Timestamp: 0,
|
||||
SourceID: 0,
|
||||
TargetID: 0,
|
||||
},
|
||||
CollectionName: collectionName,
|
||||
FieldName: fieldName,
|
||||
IndexName: indexName,
|
||||
},
|
||||
indexCoord: ic,
|
||||
queryCoord: qc,
|
||||
result: nil,
|
||||
collectionID: collectionID,
|
||||
}
|
||||
|
||||
t.Run("normal", func(t *testing.T) {
|
||||
err := dit.PreExecute(ctx)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("coll has been loaded", func(t *testing.T) {
|
||||
showCollectionMock := func(ctx context.Context, request *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
|
||||
return &querypb.ShowCollectionsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
},
|
||||
CollectionIDs: []int64{collectionID},
|
||||
}, nil
|
||||
}
|
||||
qc := NewQueryCoordMock(withValidShardLeaders(), SetQueryCoordShowCollectionsFunc(showCollectionMock))
|
||||
qc.updateState(commonpb.StateCode_Healthy)
|
||||
dit.queryCoord = qc
|
||||
|
||||
err := dit.PreExecute(ctx)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("show collection error", func(t *testing.T) {
|
||||
showCollectionMock := func(ctx context.Context, request *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
|
||||
return nil, errors.New("error")
|
||||
}
|
||||
qc := NewQueryCoordMock(withValidShardLeaders(), SetQueryCoordShowCollectionsFunc(showCollectionMock))
|
||||
qc.updateState(commonpb.StateCode_Healthy)
|
||||
dit.queryCoord = qc
|
||||
|
||||
err := dit.PreExecute(ctx)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("show collection fail", func(t *testing.T) {
|
||||
showCollectionMock := func(ctx context.Context, request *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
|
||||
return &querypb.ShowCollectionsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: "fail reason",
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
qc := NewQueryCoordMock(withValidShardLeaders(), SetQueryCoordShowCollectionsFunc(showCollectionMock))
|
||||
qc.updateState(commonpb.StateCode_Healthy)
|
||||
dit.queryCoord = qc
|
||||
|
||||
err := dit.PreExecute(ctx)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -671,6 +671,7 @@ class TestNewIndexBase(TestcaseBase):
|
|||
index_name = cf.gen_unique_str("name")
|
||||
collection_w.create_index(default_float_vec_field_name, index, index_name=index_name)
|
||||
collection_w.load()
|
||||
collection_w.release()
|
||||
collection_w.drop_index(index_name=index_name)
|
||||
assert len(collection_w.collection.indexes) == 0
|
||||
|
||||
|
@ -1275,6 +1276,7 @@ class TestNewIndexAsync(TestcaseBase):
|
|||
assert len(search_res) == ct.default_nq
|
||||
assert len(search_res[0]) == ct.default_limit
|
||||
|
||||
collection_w.release()
|
||||
if _async:
|
||||
res.done()
|
||||
assert collection_w.indexes[0].params == default_index_params
|
||||
|
|
Loading…
Reference in New Issue