Fix dead locks (#20121)

the collection's lock and meta replica's lock

Signed-off-by: yah01 <yang.cen@zilliz.com>

Signed-off-by: yah01 <yang.cen@zilliz.com>
pull/20111/head
yah01 2022-10-27 20:53:32 +08:00 committed by GitHub
parent 1be4d1e267
commit 404fc68afa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 74 additions and 67 deletions

View File

@ -44,7 +44,7 @@ import (
// Collection is a wrapper of the underlying C-structure C.CCollection
type Collection struct {
sync.RWMutex // protects colllectionPtr
mu sync.RWMutex // protects colllectionPtr
collectionPtr C.CCollection
id UniqueID
partitionIDs []UniqueID

View File

@ -82,17 +82,6 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
msg.SetTraceCtx(ctx)
}
collection, err := dNode.metaReplica.getCollectionByID(dNode.collectionID)
if err != nil {
log.Warn("failed to get collection",
zap.Int64("collectionID", dNode.collectionID),
zap.String("channel", dNode.channel),
)
return []Msg{}
}
collection.RLock()
defer collection.RUnlock()
// 1. filter segment by bloom filter
for i, delMsg := range dMsg.deleteMessages {
traceID, _, _ := trace.InfoFromSpan(spans[i])

View File

@ -188,9 +188,7 @@ func TestFlowGraphDeleteNode_operate(t *testing.T) {
},
}
msg := []flowgraph.Msg{&dMsg}
assert.Panics(t, func() {
deleteNode.Operate(msg)
})
assert.Panics(t, func() { deleteNode.Operate(msg) })
})
t.Run("test partition not exist", func(t *testing.T) {
@ -215,9 +213,7 @@ func TestFlowGraphDeleteNode_operate(t *testing.T) {
},
}
msg := []flowgraph.Msg{&dMsg}
assert.Panics(t, func() {
deleteNode.Operate(msg)
})
assert.NotPanics(t, func() { deleteNode.Operate(msg) })
})
t.Run("test invalid input length", func(t *testing.T) {

View File

@ -81,8 +81,6 @@ func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
// QueryNode should add collection before start flow graph
panic(fmt.Errorf("%s getCollectionByID failed, collectionID = %d, channel = %s", fddNode.Name(), fddNode.collectionID, fddNode.channel))
}
collection.RLock()
defer collection.RUnlock()
for _, msg := range msgStreamMsg.TsMessages() {
switch msg.Type() {

View File

@ -85,8 +85,6 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
// QueryNode should add collection before start flow graph
panic(fmt.Errorf("%s getCollectionByID failed, collectionID = %d, channel: %s", fdmNode.Name(), fdmNode.collectionID, fdmNode.channel))
}
collection.RLock()
defer collection.RUnlock()
for i, msg := range msgStreamMsg.TsMessages() {
traceID, _, _ := trace.InfoFromSpan(spans[i])

View File

@ -108,8 +108,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
// QueryNode should add collection before start flow graph
panic(fmt.Errorf("%s getCollectionByID failed, collectionID = %d, channel: %s", iNode.Name(), iNode.collectionID, iNode.channel))
}
collection.RLock()
defer collection.RUnlock()
// 1. hash insertMessages to insertData
// sort timestamps ensures that the data in iData.insertRecords is sorted in ascending order of timestamp
// avoiding re-sorting in segCore, which will need data copying
@ -179,7 +178,10 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
// should not happen, segment should be created before
err = fmt.Errorf("insertNode getSegmentByID failed, err = %s", err)
log.Error(err.Error(), zap.Int64("collectionID", iNode.collectionID), zap.String("channel", iNode.channel))
panic(err)
if !errors.Is(err, ErrSegmentNotFound) {
panic(err)
}
}
var numOfRecords = len(iData.insertIDs[segmentID])
@ -296,10 +298,14 @@ func processDeleteMessages(replica ReplicaInterface, segType segmentType, msg *m
var partitionIDs []UniqueID
var err error
if msg.PartitionID != -1 {
partitionIDs = []UniqueID{msg.PartitionID}
partitionIDs = []UniqueID{msg.GetPartitionID()}
} else {
partitionIDs, err = replica.getPartitionIDs(msg.CollectionID)
partitionIDs, err = replica.getPartitionIDs(msg.GetCollectionID())
if err != nil {
log.Warn("the collection has been released, ignore it",
zap.Int64("collectionID", msg.GetCollectionID()),
zap.Error(err),
)
return err
}
}
@ -307,6 +313,10 @@ func processDeleteMessages(replica ReplicaInterface, segType segmentType, msg *m
for _, partitionID := range partitionIDs {
segmentIDs, err := replica.getSegmentIDs(partitionID, segType)
if err != nil {
// Skip this partition
if errors.Is(err, ErrPartitionNotFound) {
continue
}
return err
}
resultSegmentIDs = append(resultSegmentIDs, segmentIDs...)
@ -374,6 +384,13 @@ func (iNode *insertNode) insert(iData *insertData, segmentID UniqueID) error {
zap.Int64("segmentID", segmentID))
var targetSegment, err = iNode.metaReplica.getSegmentByID(segmentID, segmentTypeGrowing)
if err != nil {
if errors.Is(err, ErrSegmentNotFound) {
log.Warn("the segment has been released, ignore it",
zap.Int64("segmentID", segmentID),
zap.Error(err),
)
return nil
}
return fmt.Errorf("getSegmentByID failed, err = %s", err)
}

View File

@ -126,7 +126,7 @@ func TestFlowGraphInsertNode_insert(t *testing.T) {
assert.NoError(t, err)
insertNode := newInsertNode(streaming, defaultCollectionID, defaultDMLChannel)
err = insertNode.insert(nil, defaultSegmentID)
assert.Error(t, err)
assert.NoError(t, err)
})
t.Run("test invalid segmentType", func(t *testing.T) {
@ -288,9 +288,7 @@ func TestFlowGraphInsertNode_operate(t *testing.T) {
},
}
msg := []flowgraph.Msg{&iMsg}
assert.Panics(t, func() {
insertNode.Operate(msg)
})
assert.Panics(t, func() { insertNode.Operate(msg) })
})
t.Run("test partition not exist", func(t *testing.T) {
@ -306,9 +304,7 @@ func TestFlowGraphInsertNode_operate(t *testing.T) {
},
}
msg := []flowgraph.Msg{&iMsg}
assert.Panics(t, func() {
insertNode.Operate(msg)
})
assert.NotPanics(t, func() { insertNode.Operate(msg) })
})
t.Run("test invalid input length", func(t *testing.T) {
@ -327,9 +323,7 @@ func TestFlowGraphInsertNode_operate(t *testing.T) {
err = insertNode.metaReplica.removeCollection(defaultCollectionID)
assert.NoError(t, err)
assert.Panics(t, func() {
insertNode.Operate(msg)
})
assert.Panics(t, func() { insertNode.Operate(msg) })
})
t.Run("test TransferInsertMsgToInsertRecord failed", func(t *testing.T) {

View File

@ -45,6 +45,7 @@ import (
var (
ErrSegmentNotFound = errors.New("SegmentNotFound")
ErrPartitionNotFound = errors.New("PartitionNotFound")
ErrCollectionNotFound = errors.New("CollectionNotFound")
)
@ -225,8 +226,8 @@ func (replica *metaReplica) removeCollectionPrivate(collectionID UniqueID) error
}
// block incoming search&query
collection.Lock()
defer collection.Unlock()
collection.mu.Lock()
defer collection.mu.Unlock()
// delete partitions
for _, partitionID := range collection.partitionIDs {
@ -288,6 +289,8 @@ func (replica *metaReplica) getPartitionIDs(collectionID UniqueID) ([]UniqueID,
if err != nil {
return nil, err
}
collection.mu.RLock()
defer collection.mu.RUnlock()
return collection.getPartitionIDs(), nil
}
@ -395,19 +398,22 @@ func (replica *metaReplica) getSegmentInfosByColID(collectionID UniqueID) []*que
func (replica *metaReplica) addPartition(collectionID UniqueID, partitionID UniqueID) error {
replica.mu.Lock()
defer replica.mu.Unlock()
return replica.addPartitionPrivate(collectionID, partitionID)
}
// addPartitionPrivate is the private function in collectionReplica, to add a new partition to collection
func (replica *metaReplica) addPartitionPrivate(collectionID UniqueID, partitionID UniqueID) error {
collection, err := replica.getCollectionByIDPrivate(collectionID)
if err != nil {
return err
}
collection.mu.Lock()
defer collection.mu.Unlock()
return replica.addPartitionPrivate(collection, partitionID)
}
// addPartitionPrivate is the private function in collectionReplica, to add a new partition to collection
func (replica *metaReplica) addPartitionPrivate(collection *Collection, partitionID UniqueID) error {
if !replica.hasPartitionPrivate(partitionID) {
collection.addPartitionID(partitionID)
var newPartition = newPartition(collectionID, partitionID)
var newPartition = newPartition(collection.ID(), partitionID)
replica.partitions[partitionID] = newPartition
}
@ -429,8 +435,8 @@ func (replica *metaReplica) removePartition(partitionID UniqueID) error {
if err != nil {
return err
}
collection.Lock()
defer collection.Unlock()
collection.mu.Lock()
defer collection.mu.Unlock()
return replica.removePartitionPrivate(partitionID)
}
@ -476,7 +482,7 @@ func (replica *metaReplica) getPartitionByID(partitionID UniqueID) (*Partition,
func (replica *metaReplica) getPartitionByIDPrivate(partitionID UniqueID) (*Partition, error) {
partition, ok := replica.partitions[partitionID]
if !ok {
return nil, fmt.Errorf("partition %d hasn't been loaded or has been released", partitionID)
return nil, fmt.Errorf("%w(partitionID=%d)", ErrPartitionNotFound, partitionID)
}
return partition, nil
@ -567,6 +573,9 @@ func (replica *metaReplica) addSegment(segmentID UniqueID, partitionID UniqueID,
if err != nil {
return err
}
collection.mu.Lock()
defer collection.mu.Unlock()
seg, err := newSegment(collection, segmentID, partitionID, collectionID, vChannelID, segType, version, replica.cgoPool)
if err != nil {
return err
@ -635,15 +644,15 @@ func (replica *metaReplica) removeSegment(segmentID UniqueID, segType segmentTyp
case segmentTypeGrowing:
if segment, ok := replica.growingSegments[segmentID]; ok {
if collection, ok := replica.collections[segment.collectionID]; ok {
collection.Lock()
defer collection.Unlock()
collection.mu.Lock()
defer collection.mu.Unlock()
}
}
case segmentTypeSealed:
if segment, ok := replica.sealedSegments[segmentID]; ok {
if collection, ok := replica.collections[segment.collectionID]; ok {
collection.Lock()
defer collection.Unlock()
collection.mu.Lock()
defer collection.mu.Unlock()
}
}
default:

View File

@ -185,6 +185,9 @@ type RetrievePlan struct {
}
func createRetrievePlanByExpr(col *Collection, expr []byte, timestamp Timestamp, msgID UniqueID) (*RetrievePlan, error) {
col.mu.RLock()
defer col.mu.RUnlock()
var cPlan C.CRetrievePlan
status := C.CreateRetrievePlanByExpr(col.collectionPtr, unsafe.Pointer(&expr[0]), (C.int64_t)(len(expr)), &cPlan)

View File

@ -18,6 +18,7 @@ package querynode
import (
"context"
"errors"
"github.com/milvus-io/milvus/internal/proto/segcorepb"
"github.com/milvus-io/milvus/internal/storage"
@ -31,6 +32,9 @@ func retrieveOnSegments(ctx context.Context, replica ReplicaInterface, segType s
for _, segID := range segIDs {
seg, err := replica.getSegmentByID(segID, segType)
if err != nil {
if errors.Is(err, ErrSegmentNotFound) {
continue
}
return nil, err
}
result, err := seg.retrieve(plan)

View File

@ -18,6 +18,7 @@ package querynode
import (
"context"
"errors"
"fmt"
"sync"
@ -46,6 +47,9 @@ func searchOnSegments(ctx context.Context, replica ReplicaInterface, segType seg
defer wg.Done()
seg, err := replica.getSegmentByID(segID, segType)
if err != nil {
if errors.Is(err, ErrSegmentNotFound) {
return
}
log.Error(err.Error()) // should not happen but still ignore it since the result is still correct
return
}

View File

@ -2,6 +2,7 @@ package querynode
import (
"context"
"errors"
"fmt"
"sync"
@ -23,6 +24,9 @@ func statisticOnSegments(replica ReplicaInterface, segType segmentType, segIDs [
defer wg.Done()
seg, err := replica.getSegmentByID(segID, segType)
if err != nil {
if errors.Is(err, ErrSegmentNotFound) {
return
}
log.Error(err.Error()) // should not happen but still ignore it since the result is still correct
return
}

View File

@ -66,8 +66,6 @@ func (q *queryTask) queryOnStreaming() error {
return err
}
q.QS.collection.RLock() // locks the collectionPtr
defer q.QS.collection.RUnlock()
if _, released := q.QS.collection.getReleaseTime(); released {
log.Ctx(ctx).Debug("collection release before search", zap.Int64("msgID", q.ID()),
zap.Int64("collectionID", q.CollectionID))
@ -114,9 +112,6 @@ func (q *queryTask) queryOnHistorical() error {
return err
}
q.QS.collection.RLock() // locks the collectionPtr
defer q.QS.collection.RUnlock()
if _, released := q.QS.collection.getReleaseTime(); released {
log.Ctx(ctx).Debug("collection release before search", zap.Int64("msgID", q.ID()),
zap.Int64("collectionID", q.CollectionID))

View File

@ -102,8 +102,6 @@ func (s *searchTask) searchOnStreaming() error {
return err
}
s.QS.collection.RLock() // locks the collectionPtr
defer s.QS.collection.RUnlock()
if _, released := s.QS.collection.getReleaseTime(); released {
log.Ctx(ctx).Debug("collection release before search", zap.Int64("msgID", s.ID()),
zap.Int64("collectionID", s.CollectionID))
@ -139,8 +137,6 @@ func (s *searchTask) searchOnHistorical() error {
return err
}
s.QS.collection.RLock() // locks the collectionPtr
defer s.QS.collection.RUnlock()
if _, released := s.QS.collection.getReleaseTime(); released {
log.Ctx(ctx).Warn("collection release before search", zap.Int64("msgID", s.ID()),
zap.Int64("collectionID", s.CollectionID))

View File

@ -45,8 +45,6 @@ func (s *statistics) statisticOnStreaming() error {
return err
}
s.qs.collection.RLock() // locks the collectionPtr
defer s.qs.collection.RUnlock()
if _, released := s.qs.collection.getReleaseTime(); released {
log.Ctx(ctx).Warn("collection release before do statistics", zap.Int64("msgID", s.id),
zap.Int64("collectionID", s.iReq.GetCollectionID()))
@ -76,8 +74,6 @@ func (s *statistics) statisticOnHistorical() error {
return err
}
s.qs.collection.RLock() // locks the collectionPtr
defer s.qs.collection.RUnlock()
if _, released := s.qs.collection.getReleaseTime(); released {
log.Ctx(ctx).Debug("collection release before do statistics", zap.Int64("msgID", s.id),
zap.Int64("collectionID", s.iReq.GetCollectionID()))

View File

@ -75,7 +75,11 @@ func validateOnHistoricalReplica(ctx context.Context, replica ReplicaInterface,
newSegmentIDs = segmentIDs
for _, segmentID := range newSegmentIDs {
var segment *Segment
if segment, err = replica.getSegmentByID(segmentID, segmentTypeSealed); err != nil {
segment, err = replica.getSegmentByID(segmentID, segmentTypeSealed)
if err != nil {
if errors.Is(err, ErrSegmentNotFound) {
continue
}
return searchPartIDs, newSegmentIDs, err
}
if !inList(searchPartIDs, segment.partitionID) {

View File

@ -59,7 +59,7 @@ func TestQueryShardHistorical_validateSegmentIDs(t *testing.T) {
his, err := genSimpleReplicaWithSealSegment(ctx)
assert.NoError(t, err)
_, _, err = validateOnHistoricalReplica(context.TODO(), his, defaultCollectionID, []UniqueID{defaultPartitionID}, []UniqueID{defaultSegmentID + 1})
assert.Error(t, err)
assert.NoError(t, err)
})
t.Run("test validate segment not in given partition", func(t *testing.T) {
@ -89,7 +89,7 @@ func TestQueryShardHistorical_validateSegmentIDs(t *testing.T) {
err = his.removePartition(defaultPartitionID)
assert.NoError(t, err)
_, _, err = validateOnHistoricalReplica(context.TODO(), his, defaultCollectionID, []UniqueID{}, []UniqueID{defaultSegmentID})
assert.Error(t, err)
assert.NoError(t, err)
})
t.Run("test validate after partition release2", func(t *testing.T) {