Release collection resources when all partition released (#18488)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/18492/head
congqixia 2022-08-02 11:40:34 +08:00 committed by GitHub
parent 554994d8d7
commit ab461c6e5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 127 additions and 17 deletions

View File

@ -30,6 +30,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
queryPb "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
type task interface {
@ -599,6 +600,7 @@ func (r *releaseCollectionTask) Execute(ctx context.Context) error {
r.node.metaReplica.removeExcludedSegments(r.req.CollectionID)
r.node.queryShardService.releaseCollection(r.req.CollectionID)
r.node.ShardClusterService.releaseCollection(r.req.CollectionID)
err = r.node.metaReplica.removeCollection(r.req.CollectionID)
if err != nil {
return err
@ -612,29 +614,84 @@ func (r *releaseCollectionTask) Execute(ctx context.Context) error {
// releasePartitionsTask
func (r *releasePartitionsTask) Execute(ctx context.Context) error {
log.Info("Execute release partition task",
zap.Any("collectionID", r.req.CollectionID),
zap.Any("partitionIDs", r.req.PartitionIDs))
zap.Int64("collectionID", r.req.GetCollectionID()),
zap.Int64s("partitionIDs", r.req.GetPartitionIDs()))
_, err := r.node.metaReplica.getCollectionByID(r.req.CollectionID)
coll, err := r.node.metaReplica.getCollectionByID(r.req.CollectionID)
if err != nil {
return fmt.Errorf("release partitions failed, collectionID = %d, err = %s", r.req.CollectionID, err)
}
log.Info("start release partition", zap.Any("collectionID", r.req.CollectionID))
// skip error if collection not found, do clean up job below
log.Warn("failed to get collection for release partitions", zap.Int64("collectionID", r.req.GetCollectionID()),
zap.Int64s("partitionIDs", r.req.GetPartitionIDs()))
for _, id := range r.req.PartitionIDs {
// remove partition from streaming and historical
hasPartition := r.node.metaReplica.hasPartition(id)
if hasPartition {
err := r.node.metaReplica.removePartition(id)
if err != nil {
// not return, try to release all partitions
log.Warn(err.Error())
}
log.Info("start release partition", zap.Int64("collectionID", r.req.GetCollectionID()), zap.Int64s("partitionIDs", r.req.GetPartitionIDs()))
// shall be false if coll is nil
releaseAll := r.isAllPartitionsReleased(coll)
if releaseAll {
// set release time
log.Info("set release time", zap.Int64("collectionID", r.req.CollectionID))
coll.setReleaseTime(r.req.Base.Timestamp, true)
// remove all flow graphs of the target collection
vChannels := coll.getVChannels()
vDeltaChannels := coll.getVDeltaChannels()
r.node.dataSyncService.removeFlowGraphsByDMLChannels(vChannels)
r.node.dataSyncService.removeFlowGraphsByDeltaChannels(vDeltaChannels)
// remove all tSafes of the target collection
for _, channel := range vChannels {
r.node.tSafeReplica.removeTSafe(channel)
}
for _, channel := range vDeltaChannels {
r.node.tSafeReplica.removeTSafe(channel)
}
log.Info("Release tSafe in releaseCollectionTask",
zap.Int64("collectionID", r.req.CollectionID),
zap.Strings("vChannels", vChannels),
zap.Strings("vDeltaChannels", vDeltaChannels),
)
r.node.metaReplica.removeExcludedSegments(r.req.CollectionID)
r.node.queryShardService.releaseCollection(r.req.CollectionID)
r.node.ShardClusterService.releaseCollection(r.req.CollectionID)
err = r.node.metaReplica.removeCollection(r.req.CollectionID)
if err != nil {
log.Warn("failed to remove collection", zap.Int64("collectionID", r.req.GetCollectionID()),
zap.Int64s("partitionIDs", r.req.GetPartitionIDs()), zap.Error(err))
}
} else {
for _, id := range r.req.PartitionIDs {
// remove partition from streaming and historical
hasPartition := r.node.metaReplica.hasPartition(id)
if hasPartition {
err := r.node.metaReplica.removePartition(id)
if err != nil {
// not return, try to release all partitions
log.Warn(err.Error())
}
}
}
}
log.Info("Release partition task done",
zap.Any("collectionID", r.req.CollectionID),
zap.Any("partitionIDs", r.req.PartitionIDs))
zap.Int64("collectionID", r.req.CollectionID),
zap.Int64s("partitionIDs", r.req.PartitionIDs))
return nil
}
func (r *releasePartitionsTask) isAllPartitionsReleased(coll *Collection) bool {
if coll == nil {
return false
}
if len(r.req.GetPartitionIDs()) < len(coll.partitionIDs) && len(coll.partitionIDs) > 0 {
return false
}
parts := make(typeutil.UniqueSet)
for _, partID := range r.req.GetPartitionIDs() {
parts.Insert(partID)
}
return parts.Contain(coll.partitionIDs...)
}

View File

@ -23,6 +23,7 @@ import (
"github.com/apache/pulsar-client-go/pulsar"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
@ -728,6 +729,28 @@ func TestTask_releasePartitionTask(t *testing.T) {
assert.NoError(t, err)
})
t.Run("test isAllPartitionsReleased", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
task := releasePartitionsTask{
req: genReleasePartitionsRequest(),
node: node,
}
coll, err := node.metaReplica.getCollectionByID(defaultCollectionID)
require.NoError(t, err)
assert.False(t, task.isAllPartitionsReleased(nil))
assert.True(t, task.isAllPartitionsReleased(coll))
node.metaReplica.addPartition(defaultCollectionID, -1)
assert.False(t, task.isAllPartitionsReleased(coll))
node.metaReplica.removePartition(defaultPartitionID)
node.metaReplica.removePartition(-1)
assert.True(t, task.isAllPartitionsReleased(coll))
})
t.Run("test execute", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
@ -758,7 +781,37 @@ func TestTask_releasePartitionTask(t *testing.T) {
assert.NoError(t, err)
err = task.Execute(ctx)
assert.Error(t, err)
assert.NoError(t, err)
})
t.Run("test execute no partition", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
task := releasePartitionsTask{
req: genReleasePartitionsRequest(),
node: node,
}
err = node.metaReplica.removePartition(defaultPartitionID)
assert.NoError(t, err)
err = task.Execute(ctx)
assert.NoError(t, err)
})
t.Run("test execute non-exist partition", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
req := genReleasePartitionsRequest()
req.PartitionIDs = []int64{-1}
task := releasePartitionsTask{
req: req,
node: node,
}
err = task.Execute(ctx)
assert.NoError(t, err)
})
t.Run("test execute remove deltaVChannel", func(t *testing.T) {