mirror of https://github.com/milvus-io/milvus.git
Add nil check in reduce and add collection rwmutex when release segments (#17698)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/17716/head
parent
a077bad84a
commit
8cf54137cf
|
@ -349,7 +349,17 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, in *queryPb.ReleaseS
|
||||||
return status, nil
|
return status, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// collection lock is not needed since we guarantee not query/search will be dispatch from leader
|
collection, err := node.metaReplica.getCollectionByID(in.CollectionID)
|
||||||
|
if err != nil {
|
||||||
|
status := &commonpb.Status{
|
||||||
|
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||||
|
Reason: fmt.Sprintf("cannot find collection %d when ReleaseSegments", in.CollectionID),
|
||||||
|
}
|
||||||
|
return status, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
collection.Lock()
|
||||||
|
defer collection.Unlock()
|
||||||
for _, id := range in.SegmentIDs {
|
for _, id := range in.SegmentIDs {
|
||||||
switch in.GetScope() {
|
switch in.GetScope() {
|
||||||
case queryPb.DataScope_Streaming:
|
case queryPb.DataScope_Streaming:
|
||||||
|
|
|
@ -444,6 +444,24 @@ func TestImpl_ReleaseSegments(t *testing.T) {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
|
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
t.Run("test no collection", func(t *testing.T) {
|
||||||
|
defer wg.Done()
|
||||||
|
node, err := genSimpleQueryNode(ctx)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
err = node.metaReplica.removeCollection(defaultCollectionID)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
req := &queryPb.ReleaseSegmentsRequest{
|
||||||
|
Base: genCommonMsgBase(commonpb.MsgType_ReleaseSegments),
|
||||||
|
CollectionID: defaultCollectionID,
|
||||||
|
}
|
||||||
|
|
||||||
|
status, err := node.ReleaseSegments(ctx, req)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
|
||||||
|
})
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -92,6 +92,9 @@ func reduceSearchResultsAndFillData(plan *SearchPlan, searchResults []*SearchRes
|
||||||
|
|
||||||
cSearchResults := make([]C.CSearchResult, 0)
|
cSearchResults := make([]C.CSearchResult, 0)
|
||||||
for _, res := range searchResults {
|
for _, res := range searchResults {
|
||||||
|
if res == nil {
|
||||||
|
return nil, fmt.Errorf("nil searchResult detected when reduceSearchResultsAndFillData")
|
||||||
|
}
|
||||||
cSearchResults = append(cSearchResults, res.cSearchResult)
|
cSearchResults = append(cSearchResults, res.cSearchResult)
|
||||||
}
|
}
|
||||||
cSearchResultPtr := (*C.CSearchResult)(&cSearchResults[0])
|
cSearchResultPtr := (*C.CSearchResult)(&cSearchResults[0])
|
||||||
|
|
|
@ -104,8 +104,20 @@ func TestReduce_AllFunc(t *testing.T) {
|
||||||
deleteCollection(collection)
|
deleteCollection(collection)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestReduce_nilPlan(t *testing.T) {
|
func TestReduce_Invalid(t *testing.T) {
|
||||||
plan := &SearchPlan{}
|
t.Run("nil plan", func(t *testing.T) {
|
||||||
_, err := reduceSearchResultsAndFillData(plan, nil, 1, nil, nil)
|
plan := &SearchPlan{}
|
||||||
assert.Error(t, err)
|
_, err := reduceSearchResultsAndFillData(plan, nil, 1, nil, nil)
|
||||||
|
assert.Error(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("nil search result", func(t *testing.T) {
|
||||||
|
collection := newCollection(defaultCollectionID, genTestCollectionSchema())
|
||||||
|
searchReq, err := genSearchPlanAndRequests(collection, IndexHNSW, 10)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
searchResults := make([]*SearchResult, 0)
|
||||||
|
searchResults = append(searchResults, nil)
|
||||||
|
_, err = reduceSearchResultsAndFillData(searchReq.plan, searchResults, 1, []int32{10}, []int32{10})
|
||||||
|
assert.Error(t, err)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus/internal/log"
|
||||||
"github.com/milvus-io/milvus/internal/metrics"
|
"github.com/milvus-io/milvus/internal/metrics"
|
||||||
"github.com/milvus-io/milvus/internal/util/timerecord"
|
"github.com/milvus-io/milvus/internal/util/timerecord"
|
||||||
)
|
)
|
||||||
|
@ -39,6 +40,7 @@ func searchOnSegments(replica ReplicaInterface, segType segmentType, searchReq *
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
seg, err := replica.getSegmentByID(segID, segType)
|
seg, err := replica.getSegmentByID(segID, segType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Error(err.Error()) // should not happen but still ignore it since the result is still correct
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// record search time
|
// record search time
|
||||||
|
|
Loading…
Reference in New Issue