mirror of https://github.com/milvus-io/milvus.git
Cherry-pick from master pr: #32291 `applyDelete` used to be serial for delete entries on each segments. This PR make it work in parallel with errgroup to improve performance --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/32675/head
parent
4f465343d3
commit
193f88bec3
|
@ -20,6 +20,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"runtime"
|
||||
"sort"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
|
@ -43,6 +44,7 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
|
||||
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
|
@ -277,52 +279,66 @@ func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) {
|
|||
|
||||
// applyDelete handles delete record and apply them to corresponding workers.
|
||||
func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker cluster.Worker, delRecords map[int64]DeleteData, entries []SegmentEntry, scope querypb.DataScope) []int64 {
|
||||
var offlineSegments []int64
|
||||
offlineSegments := typeutil.NewConcurrentSet[int64]()
|
||||
log := sd.getLogger(ctx)
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
pool := conc.NewPool[struct{}](runtime.GOMAXPROCS(0) * 4)
|
||||
defer pool.Release()
|
||||
|
||||
var futures []*conc.Future[struct{}]
|
||||
for _, segmentEntry := range entries {
|
||||
log := log.With(
|
||||
zap.Int64("segmentID", segmentEntry.SegmentID),
|
||||
zap.Int64("workerID", nodeID),
|
||||
)
|
||||
segmentEntry := segmentEntry
|
||||
delRecord, ok := delRecords[segmentEntry.SegmentID]
|
||||
if ok {
|
||||
log.Debug("delegator plan to applyDelete via worker")
|
||||
err := retry.Do(ctx, func() error {
|
||||
if sd.Stopped() {
|
||||
return retry.Unrecoverable(merr.WrapErrChannelNotAvailable(sd.vchannelName, "channel is unsubscribing"))
|
||||
}
|
||||
future := pool.Submit(func() (struct{}, error) {
|
||||
log.Debug("delegator plan to applyDelete via worker")
|
||||
err := retry.Handle(ctx, func() (bool, error) {
|
||||
if sd.Stopped() {
|
||||
return false, merr.WrapErrChannelNotAvailable(sd.vchannelName, "channel is unsubscribing")
|
||||
}
|
||||
|
||||
err := worker.Delete(ctx, &querypb.DeleteRequest{
|
||||
Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(nodeID)),
|
||||
CollectionId: sd.collectionID,
|
||||
PartitionId: segmentEntry.PartitionID,
|
||||
VchannelName: sd.vchannelName,
|
||||
SegmentId: segmentEntry.SegmentID,
|
||||
PrimaryKeys: storage.ParsePrimaryKeys2IDs(delRecord.PrimaryKeys),
|
||||
Timestamps: delRecord.Timestamps,
|
||||
Scope: scope,
|
||||
})
|
||||
if errors.Is(err, merr.ErrNodeNotFound) {
|
||||
log.Warn("try to delete data on non-exist node")
|
||||
return retry.Unrecoverable(err)
|
||||
} else if errors.IsAny(err, merr.ErrSegmentNotFound, merr.ErrSegmentNotLoaded) {
|
||||
log.Warn("try to delete data of released segment")
|
||||
return nil
|
||||
} else if err != nil {
|
||||
log.Warn("worker failed to delete on segment",
|
||||
zap.Error(err),
|
||||
)
|
||||
return err
|
||||
err := worker.Delete(ctx, &querypb.DeleteRequest{
|
||||
Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(nodeID)),
|
||||
CollectionId: sd.collectionID,
|
||||
PartitionId: segmentEntry.PartitionID,
|
||||
VchannelName: sd.vchannelName,
|
||||
SegmentId: segmentEntry.SegmentID,
|
||||
PrimaryKeys: storage.ParsePrimaryKeys2IDs(delRecord.PrimaryKeys),
|
||||
Timestamps: delRecord.Timestamps,
|
||||
Scope: scope,
|
||||
})
|
||||
if errors.Is(err, merr.ErrNodeNotFound) {
|
||||
log.Warn("try to delete data on non-exist node")
|
||||
// cancel other request
|
||||
cancel()
|
||||
return false, err
|
||||
} else if errors.IsAny(err, merr.ErrSegmentNotFound, merr.ErrSegmentNotLoaded) {
|
||||
log.Warn("try to delete data of released segment")
|
||||
return false, nil
|
||||
} else if err != nil {
|
||||
log.Warn("worker failed to delete on segment", zap.Error(err))
|
||||
return true, err
|
||||
}
|
||||
return false, nil
|
||||
}, retry.Attempts(10))
|
||||
if err != nil {
|
||||
log.Warn("apply delete for segment failed, marking it offline")
|
||||
offlineSegments.Insert(segmentEntry.SegmentID)
|
||||
}
|
||||
return nil
|
||||
}, retry.Attempts(10))
|
||||
if err != nil {
|
||||
log.Warn("apply delete for segment failed, marking it offline")
|
||||
offlineSegments = append(offlineSegments, segmentEntry.SegmentID)
|
||||
}
|
||||
return struct{}{}, err
|
||||
})
|
||||
futures = append(futures, future)
|
||||
}
|
||||
}
|
||||
return offlineSegments
|
||||
conc.AwaitAll(futures...)
|
||||
return offlineSegments.Collect()
|
||||
}
|
||||
|
||||
// markSegmentOffline makes segment go offline and waits for QueryCoord to fix.
|
||||
|
|
Loading…
Reference in New Issue