Add unsubscribe dmchannel task (#21513)

Signed-off-by: sunby <bingyi.sun@zilliz.com>
Co-authored-by: sunby <bingyi.sun@zilliz.com>
pull/21804/head
Bingyi Sun 2023-01-18 17:45:44 +08:00 committed by GitHub
parent 297fef67c7
commit c82b5d15b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 92 additions and 13 deletions

View File

@ -412,20 +412,17 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC
return status, nil
}
dct := &releaseCollectionTask{
unsubTask := &unsubDmChannelTask{
baseTask: baseTask{
ctx: ctx,
done: make(chan error),
},
req: &querypb.ReleaseCollectionRequest{
Base: req.GetBase(),
CollectionID: req.GetCollectionID(),
NodeID: req.GetNodeID(),
},
node: node,
node: node,
collectionID: req.GetCollectionID(),
channel: req.GetChannelName(),
}
err := node.scheduler.queue.Enqueue(dct)
err := node.scheduler.queue.Enqueue(unsubTask)
if err != nil {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
@ -434,9 +431,9 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC
log.Warn("failed to enqueue subscribe channel task", zap.Error(err))
return status, nil
}
log.Info("unsubDmChannel(ReleaseCollection) enqueue done", zap.Int64("collectionID", req.GetCollectionID()))
log.Info("unsubDmChannelTask enqueue done", zap.Int64("collectionID", req.GetCollectionID()))
err = dct.WaitToFinish()
err = unsubTask.WaitToFinish()
if err != nil {
log.Warn("failed to do subscribe channel task successfully", zap.Error(err))
return &commonpb.Status{
@ -445,7 +442,7 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC
}, nil
}
log.Info("unsubDmChannel(ReleaseCollection) WaitToFinish done", zap.Int64("collectionID", req.GetCollectionID()))
log.Info("unsubDmChannelTask WaitToFinish done", zap.Int64("collectionID", req.GetCollectionID()))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil

View File

@ -227,7 +227,7 @@ func TestImpl_UnsubDmChannel(t *testing.T) {
Infos: []*datapb.VchannelInfo{
{
CollectionID: 1000,
ChannelName: "1000-dmc0",
ChannelName: Params.CommonCfg.RootCoordDml.GetValue() + "-dmc0",
},
},
}
@ -245,7 +245,7 @@ func TestImpl_UnsubDmChannel(t *testing.T) {
},
NodeID: 0,
CollectionID: defaultCollectionID,
ChannelName: "1000-dmc0",
ChannelName: Params.CommonCfg.RootCoordDml.GetValue() + "-dmc0",
}
originMetaReplica := node.metaReplica
node.metaReplica = newMockReplicaInterface()

View File

@ -148,3 +148,15 @@ func (q *queryShardService) releaseCollection(collectionID int64) {
q.queryShardsMu.Unlock()
log.Info("release collection in query shard service", zap.Int64("collectionId", collectionID))
}
func (q *queryShardService) releaseQueryShard(channel string) {
q.queryShardsMu.Lock()
defer q.queryShardsMu.Unlock()
for ch, queryShard := range q.queryShards {
if ch == channel {
queryShard.Close()
delete(q.queryShards, ch)
break
}
}
}

View File

@ -28,6 +28,8 @@ import (
"github.com/milvus-io/milvus/internal/util/typeutil"
)
var ErrChannelNotFound = errors.New("channel not found")
type task interface {
ID() UniqueID // return ReqID
Ctx() context.Context
@ -226,3 +228,71 @@ func (r *releasePartitionsTask) isAllPartitionsReleased(coll *Collection) bool {
return parts.Contain(coll.partitionIDs...)
}
type unsubDmChannelTask struct {
baseTask
node *QueryNode
collectionID int64
channel string
}
func (t *unsubDmChannelTask) Execute(ctx context.Context) error {
log.Info("start to execute unsubscribe dmchannel task", zap.Int64("collectionID", t.collectionID), zap.String("channel", t.channel))
collection, err := t.node.metaReplica.getCollectionByID(t.collectionID)
if err != nil {
if errors.Is(err, ErrCollectionNotFound) {
log.Info("collection has been released",
zap.Int64("collectionID", t.collectionID),
zap.Error(err),
)
return nil
}
return err
}
channels := collection.getVChannels()
var find bool
for _, c := range channels {
if c == t.channel {
find = true
break
}
}
if !find {
return ErrChannelNotFound
}
if err := t.releaseChannelResources(collection); err != nil {
return err
}
debug.FreeOSMemory()
return nil
}
func (t *unsubDmChannelTask) releaseChannelResources(collection *Collection) error {
log := log.With(zap.Int64("collectionID", t.collectionID), zap.String("channel", t.channel))
log.Info("start to release channel resources")
collection.removeVChannel(t.channel)
// release flowgraph resources
t.node.dataSyncService.removeFlowGraphsByDMLChannels([]string{t.channel})
t.node.queryShardService.releaseQueryShard(t.channel)
t.node.ShardClusterService.releaseShardCluster(t.channel)
t.node.tSafeReplica.removeTSafe(t.channel)
log.Info("release channel related resources successfully")
// release segment resources
segmentIDs, err := t.node.metaReplica.getSegmentIDsByVChannel(nil, t.channel, segmentTypeGrowing)
if err != nil {
return err
}
for _, segmentID := range segmentIDs {
t.node.metaReplica.removeSegment(segmentID, segmentTypeGrowing)
}
t.node.dataSyncService.removeEmptyFlowGraphByChannel(t.collectionID, t.channel)
log.Info("release segment resources successfully")
return nil
}