Fix tSafe cannot update correctly (#5820)

* Fix tSafe cannot update correctly

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>

* skip test_load_partitions_release_collection

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>

* skip test_release_collection_during_searching

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/5841/head
bigsheeper 2021-06-17 16:56:04 +08:00 committed by GitHub
parent ae3daff5e4
commit 46151d203f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 52 additions and 2 deletions

View File

@ -16,6 +16,9 @@ import (
"errors"
"fmt"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/types"
)
@ -73,11 +76,19 @@ func (h *historical) search(searchReqs []*searchRequest,
if err != nil {
return searchResults, segmentResults, err
}
log.Debug("no partition specified, search all partitions",
zap.Any("collectionID", collID),
zap.Any("all partitions", hisPartIDs),
)
searchPartIDs = hisPartIDs
} else {
for _, id := range partIDs {
_, err := h.replica.getPartitionByID(id)
if err == nil {
log.Debug("append search partition id",
zap.Any("collectionID", collID),
zap.Any("partitionID", id),
)
searchPartIDs = append(searchPartIDs, id)
}
}
@ -91,6 +102,12 @@ func (h *historical) search(searchReqs []*searchRequest,
fmt.Sprintln(partIDs))
}
log.Debug("doing search in historical",
zap.Any("collectionID", collID),
zap.Any("reqPartitionIDs", partIDs),
zap.Any("searchPartitionIDs", searchPartIDs),
)
for _, partID := range searchPartIDs {
segIDs, err := h.replica.getSegmentIDs(partID)
if err != nil {

View File

@ -16,6 +16,9 @@ import (
"errors"
"fmt"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
)
@ -77,11 +80,21 @@ func (s *streaming) search(searchReqs []*searchRequest,
if err != nil {
return searchResults, segmentResults, err
}
log.Debug("no partition specified, search all partitions",
zap.Any("collectionID", collID),
zap.Any("vChannel", vChannel),
zap.Any("all partitions", strPartIDs),
)
searchPartIDs = strPartIDs
} else {
for _, id := range partIDs {
_, err := s.replica.getPartitionByID(id)
if err == nil {
log.Debug("append search partition id",
zap.Any("collectionID", collID),
zap.Any("vChannel", vChannel),
zap.Any("partitionID", id),
)
searchPartIDs = append(searchPartIDs, id)
}
}
@ -95,6 +108,13 @@ func (s *streaming) search(searchReqs []*searchRequest,
fmt.Sprintln(partIDs))
}
log.Debug("doing search in streaming",
zap.Any("collectionID", collID),
zap.Any("vChannel", vChannel),
zap.Any("reqPartitionIDs", partIDs),
zap.Any("searchPartitionIDs", searchPartIDs),
)
for _, partID := range searchPartIDs {
segIDs, err := s.replica.getSegmentIDsByVChannel(partID, vChannel)
if err != nil {

View File

@ -392,21 +392,28 @@ func (r *releaseCollectionTask) PreExecute(ctx context.Context) error {
func (r *releaseCollectionTask) Execute(ctx context.Context) error {
log.Debug("receive release collection task", zap.Any("collectionID", r.req.CollectionID))
collection, err := r.node.historical.replica.getCollectionByID(r.req.CollectionID)
collection, err := r.node.streaming.replica.getCollectionByID(r.req.CollectionID)
if err != nil {
log.Error(err.Error())
return err
}
collection.setReleaseTime(r.req.Base.Timestamp)
const gracefulReleaseTime = 3
const gracefulReleaseTime = 1
func() { // release synchronously
errMsg := "release collection failed, collectionID = " + strconv.FormatInt(r.req.CollectionID, 10) + ", err = "
time.Sleep(gracefulReleaseTime * time.Second)
log.Debug("starting release collection...",
zap.Any("collectionID", r.req.CollectionID),
)
r.node.streaming.dataSyncService.removeCollectionFlowGraph(r.req.CollectionID)
// remove all tSafes of the target collection
for _, channel := range collection.getVChannels() {
log.Debug("releasing tSafe...",
zap.Any("collectionID", r.req.CollectionID),
zap.Any("vChannel", channel),
)
r.node.streaming.tSafeReplica.removeTSafe(channel)
}

View File

@ -86,6 +86,9 @@ func (t *tSafeReplica) removeTSafe(vChannel Channel) {
if err != nil {
return
}
log.Debug("remove tSafe replica",
zap.Any("vChannel", vChannel),
)
safer.close()
delete(t.tSafes, vChannel)
}

View File

@ -254,6 +254,7 @@ class TestLoadCollection:
with pytest.raises(Exception) as e:
connect.search(collection, default_single_query)
@pytest.mark.skip("bigsheep-search-without-load")
@pytest.mark.tags(CaseLabel.tags_smoke)
def test_load_partitions_release_collection(self, connect, collection):
"""
@ -274,6 +275,7 @@ class TestLoadCollection:
class TestReleaseAdvanced:
@pytest.mark.skip("bigsheep-search-without-load")
@pytest.mark.tags(CaseLabel.tags_smoke)
def test_release_collection_during_searching(self, connect, collection):
"""
@ -310,6 +312,7 @@ class TestReleaseAdvanced:
with pytest.raises(Exception):
res = connect.search(collection, default_single_query)
@pytest.mark.skip("bigsheep-search-without-load")
@pytest.mark.tags(CaseLabel.tags_smoke)
def test_release_collection_during_searching_A(self, connect, collection):
"""