Remove delta vchannel when clean up empty flowgraph (#20182)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/20189/head
congqixia 2022-10-29 18:31:33 +08:00 committed by GitHub
parent 977947e224
commit 19300bd36f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 42 additions and 0 deletions

View File

@ -288,6 +288,8 @@ func (dsService *dataSyncService) removeEmptyFlowGraphByChannel(collectionID int
fg.close()
delete(dsService.deltaChannel2FlowGraph, dc)
dsService.tSafeReplica.removeTSafe(dc)
// try best to remove, it's ok if all info is gone before this call
dsService.metaReplica.removeCollectionVDeltaChannel(collectionID, dc)
rateCol.removeTSafeChannel(dc)
}

View File

@ -82,6 +82,8 @@ type ReplicaInterface interface {
getPKFieldIDByCollectionID(collectionID UniqueID) (FieldID, error)
// getSegmentInfosByColID return segments info by collectionID
getSegmentInfosByColID(collectionID UniqueID) []*querypb.SegmentInfo
// removeCollectionVDeltaChannel remove vdelta channel replica info from collection.
removeCollectionVDeltaChannel(collectionID UniqueID, vDeltaChannel string)
// partition
// addPartition adds a new partition to collection
@ -853,6 +855,20 @@ func (replica *metaReplica) getSealedSegments() []*Segment {
return ret
}
// removeCollectionVDeltaChannel remove vdelta channel replica info from collection.
func (replica *metaReplica) removeCollectionVDeltaChannel(collectionID UniqueID, vDeltaChannel string) {
replica.mu.RLock()
defer replica.mu.RUnlock()
coll, ok := replica.collections[collectionID]
if !ok {
// if collection already release, that's fine and just return
return
}
coll.removeVDeltaChannel(vDeltaChannel)
}
// newCollectionReplica returns a new ReplicaInterface
func newCollectionReplica(pool *concurrency.Pool) ReplicaInterface {
var replica ReplicaInterface = &metaReplica{

View File

@ -408,6 +408,30 @@ func TestMetaReplica_BlackList(t *testing.T) {
}
func TestMetaReplica_removeCollectionVDeltaChannel(t *testing.T) {
replica, err := genSimpleReplica()
require.NoError(t, err)
// remove when collection not exists
assert.NotPanics(t, func() {
replica.removeCollectionVDeltaChannel(-1, defaultDeltaChannel)
})
schema := genTestCollectionSchema()
collection := replica.addCollection(defaultCollectionID, schema)
replica.addPartition(defaultCollectionID, defaultPartitionID)
replica.addPartition(defaultCollectionID, defaultPartitionID+1)
collection.addVDeltaChannels([]string{defaultDeltaChannel})
assert.NotPanics(t, func() {
replica.removeCollectionVDeltaChannel(defaultCollectionID, defaultDeltaChannel)
})
channels := collection.getVDeltaChannels()
assert.Equal(t, 0, len(channels))
}
func TestMetaReplica_freeAll(t *testing.T) {
replica, err := genSimpleReplica()
assert.NoError(t, err)