enhance: [2.5] Support detailed manual compaction criterion (#40892) (#40924)

Cherry-pick from master
pr: #40892
Related to #40866

This PR:
- update go-api/v2 and support partition id/channel/segment level manual
compaction
- refines the compaction trigger implementation
- unify the compaction signal usage

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/40946/head
congqixia 2025-03-27 10:36:22 +08:00 committed by GitHub
parent 534b628278
commit 2411c184a8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 519 additions and 372 deletions

View File

@ -6,7 +6,7 @@ require (
github.com/blang/semver/v4 v4.0.0
github.com/cockroachdb/errors v1.9.1
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.7
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.8-0.20250326031212-c94bad0122dd
github.com/milvus-io/milvus/pkg/v2 v2.5.7
github.com/quasilyte/go-ruleguard/dsl v0.3.22
github.com/samber/lo v1.27.0

View File

@ -318,8 +318,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfr
github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.7 h1:trg9Lri1K2JxluLXK7AR4iCLfXucPhWPVwAF6eMXNrg=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.7/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.8-0.20250326031212-c94bad0122dd h1:Vd+RIXg+yfl5w72LKWzPeNZH/c8cDO4K54Lf07B39+E=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.8-0.20250326031212-c94bad0122dd/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus/pkg/v2 v2.5.7 h1:b45jq1s1v03AekFucs2/dkkXohB57gEx7gspJuAkfbY=
github.com/milvus-io/milvus/pkg/v2 v2.5.7/go.mod h1:pImw1IGNS7k/5yvlZV2tZi5vZu1VQRlQij+r39d+XnI=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=

2
go.mod
View File

@ -23,7 +23,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.9
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.8-0.20250319131803-68e8b224752b
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.8-0.20250326031212-c94bad0122dd
github.com/minio/minio-go/v7 v7.0.73
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/prometheus/client_golang v1.14.0

4
go.sum
View File

@ -630,8 +630,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.8-0.20250319131803-68e8b224752b h1:vCTi6z+V28LJHeY1X7Qwz22A58tza9ZzfbMwEpHlDU4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.8-0.20250319131803-68e8b224752b/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.8-0.20250326031212-c94bad0122dd h1:Vd+RIXg+yfl5w72LKWzPeNZH/c8cDO4K54Lf07B39+E=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.8-0.20250326031212-c94bad0122dd/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE=
github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=

View File

@ -8,6 +8,9 @@ dir: "{{.InterfaceDir}}"
packages:
github.com/milvus-io/milvus/internal/datacoord:
interfaces:
trigger:
config:
mockname: MockTrigger
compactionPlanContext:
config:
mockname: MockCompactionPlanContext

View File

@ -32,8 +32,8 @@ import (
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/lifetime"
"github.com/milvus-io/milvus/pkg/v2/util/lock"
"github.com/milvus-io/milvus/pkg/v2/util/logutil"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
@ -49,21 +49,68 @@ type compactTime struct {
type trigger interface {
start()
stop()
// triggerSingleCompaction triggers a compaction bundled with collection-partition-channel-segment
triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string, blockToSendSignal bool) error
// triggerManualCompaction force to start a compaction
triggerManualCompaction(collectionID int64) (UniqueID, error)
TriggerCompaction(ctx context.Context, signal *compactionSignal) (signalID UniqueID, err error)
}
type compactionSignal struct {
id UniqueID
isForce bool
isGlobal bool
collectionID UniqueID
partitionID UniqueID
channel string
segmentID UniqueID
segmentIDs []UniqueID
pos *msgpb.MsgPosition
resultCh chan error
waitResult bool
}
func NewCompactionSignal() *compactionSignal {
return &compactionSignal{
resultCh: make(chan error, 1),
waitResult: true,
}
}
func (cs *compactionSignal) WithID(id UniqueID) *compactionSignal {
cs.id = id
return cs
}
func (cs *compactionSignal) WithIsForce(isForce bool) *compactionSignal {
cs.isForce = isForce
return cs
}
func (cs *compactionSignal) WithCollectionID(collectionID UniqueID) *compactionSignal {
cs.collectionID = collectionID
return cs
}
func (cs *compactionSignal) WithPartitionID(partitionID UniqueID) *compactionSignal {
cs.partitionID = partitionID
return cs
}
func (cs *compactionSignal) WithChannel(channel string) *compactionSignal {
cs.channel = channel
return cs
}
func (cs *compactionSignal) WithSegmentIDs(segmentIDs ...UniqueID) *compactionSignal {
cs.segmentIDs = segmentIDs
return cs
}
func (cs *compactionSignal) WithWaitResult(waitResult bool) *compactionSignal {
cs.waitResult = waitResult
return cs
}
func (cs *compactionSignal) Notify(result error) {
select {
case cs.resultCh <- result:
default:
}
}
var _ trigger = (*compactionTrigger)(nil)
@ -73,9 +120,9 @@ type compactionTrigger struct {
meta *meta
allocator allocator.Allocator
signals chan *compactionSignal
manualSignals chan *compactionSignal
compactionHandler compactionPlanContext
globalTrigger *time.Ticker
forceMu lock.Mutex
closeCh lifetime.SafeChan
closeWaiter sync.WaitGroup
@ -99,6 +146,7 @@ func newCompactionTrigger(
meta: meta,
allocator: allocator,
signals: make(chan *compactionSignal, 100),
manualSignals: make(chan *compactionSignal, 100),
compactionHandler: compactionHandler,
indexEngineVersionManager: indexVersionManager,
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
@ -112,37 +160,19 @@ func (t *compactionTrigger) start() {
t.globalTrigger = time.NewTicker(Params.DataCoordCfg.MixCompactionTriggerInterval.GetAsDuration(time.Second))
t.closeWaiter.Add(2)
go func() {
defer logutil.LogPanic()
defer t.closeWaiter.Done()
for {
select {
case <-t.closeCh.CloseCh():
log.Info("compaction trigger quit")
return
case signal := <-t.signals:
switch {
case signal.isGlobal:
// ManualCompaction also use use handleGlobalSignal
// so throw err here
err := t.handleGlobalSignal(signal)
if err != nil {
log.Warn("unable to handleGlobalSignal", zap.Error(err))
}
default:
// no need to handle err in handleSignal
t.handleSignal(signal)
}
}
}
t.work()
}()
go t.startGlobalCompactionLoop()
go func() {
defer t.closeWaiter.Done()
t.schedule()
}()
}
func (t *compactionTrigger) startGlobalCompactionLoop() {
// schedule method triggers global signal by configured interval.
func (t *compactionTrigger) schedule() {
defer logutil.LogPanic()
defer t.closeWaiter.Done()
// If AutoCompaction disabled, global loop will not start
if !Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() {
@ -156,7 +186,9 @@ func (t *compactionTrigger) startGlobalCompactionLoop() {
log.Info("global compaction loop exit")
return
case <-t.globalTrigger.C:
err := t.triggerCompaction()
// default signal, all collections withi isGlobal = true
_, err := t.TriggerCompaction(context.Background(),
NewCompactionSignal())
if err != nil {
log.Warn("unable to triggerCompaction", zap.Error(err))
}
@ -164,6 +196,27 @@ func (t *compactionTrigger) startGlobalCompactionLoop() {
}
}
// work method listens the signal channels and generate plans from them.
func (t *compactionTrigger) work() {
defer logutil.LogPanic()
for {
var signal *compactionSignal
select {
case <-t.closeCh.CloseCh():
log.Info("compaction trigger quit")
return
case signal = <-t.signals:
case signal = <-t.manualSignals:
}
err := t.handleSignal(signal)
if err != nil {
log.Warn("unable to handleSignal", zap.Int64("signalID", signal.id), zap.Error(err))
}
signal.Notify(err)
}
}
func (t *compactionTrigger) stop() {
t.closeCh.Close()
t.closeWaiter.Wait()
@ -206,118 +259,92 @@ func getCompactTime(ts Timestamp, coll *collectionInfo) (*compactTime, error) {
return &compactTime{ts, 0, 0}, nil
}
// triggerCompaction trigger a compaction if any compaction condition satisfy.
func (t *compactionTrigger) triggerCompaction() error {
id, err := t.allocSignalID()
if err != nil {
return err
}
signal := &compactionSignal{
id: id,
isForce: false,
isGlobal: true,
}
t.signals <- signal
return nil
}
// triggerSingleCompaction trigger a compaction bundled with collection-partition-channel-segment
func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string, blockToSendSignal bool) error {
// TrigerCompaction is the public interface to send compaction signal to work queue.
// when waitResult = true, it waits until the result is returned from worker(via `signal.resultCh`)
// or the context is timeouted/canceled
// otherwise, it just try best to submit the signal to the channel, if the channel is full it just returns err
//
// by default, `signals` channel will be used to send compaction signal
// however, when the `isForce` flag is true, the `manualSignals` channel will be used to skip the queueing
// since manual signals shall have higher priority.
func (t *compactionTrigger) TriggerCompaction(ctx context.Context, signal *compactionSignal) (signalID UniqueID, err error) {
// If AutoCompaction disabled, flush request will not trigger compaction
if !paramtable.Get().DataCoordCfg.EnableAutoCompaction.GetAsBool() && !paramtable.Get().DataCoordCfg.EnableCompaction.GetAsBool() {
return nil
return -1, nil
}
id, err := t.allocSignalID()
id, err := t.allocSignalID(ctx)
if err != nil {
return err
return -1, err
}
signal := &compactionSignal{
id: id,
isForce: false,
isGlobal: false,
collectionID: collectionID,
partitionID: partitionID,
segmentID: segmentID,
channel: channel,
signal.WithID(id)
signalCh := t.signals
// use force signal channel to skip non-force signal queue
if signal.isForce {
signalCh = t.manualSignals
}
if blockToSendSignal {
t.signals <- signal
return nil
// non force mode, try best to sent signal only
if !signal.waitResult {
select {
case signalCh <- signal:
default:
log.Info("no space to send compaction signal",
zap.Int64("collectionID", signal.collectionID),
zap.Int64s("segmentID", signal.segmentIDs),
zap.String("channel", signal.channel))
return -1, merr.WrapErrServiceUnavailable("signal channel is full")
}
return id, nil
}
// force flag make sure signal is handle and returns error if any
select {
case t.signals <- signal:
default:
log.Info("no space to send compaction signal", zap.Int64("collectionID", collectionID), zap.Int64("segmentID", segmentID), zap.String("channel", channel))
case signalCh <- signal:
case <-ctx.Done():
return -1, ctx.Err()
}
return nil
select {
case err = <-signal.resultCh:
return id, err
case <-ctx.Done():
return -1, ctx.Err()
}
}
// triggerManualCompaction force to start a compaction
// invoked by user `ManualCompaction` operation
func (t *compactionTrigger) triggerManualCompaction(collectionID int64) (UniqueID, error) {
id, err := t.allocSignalID()
if err != nil {
return -1, err
}
signal := &compactionSignal{
id: id,
isForce: true,
isGlobal: true,
collectionID: collectionID,
}
err = t.handleGlobalSignal(signal)
if err != nil {
log.Warn("unable to handle compaction signal", zap.Error(err))
return -1, err
}
return id, nil
}
func (t *compactionTrigger) allocSignalID() (UniqueID, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
func (t *compactionTrigger) allocSignalID(ctx context.Context) (UniqueID, error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
return t.allocator.AllocID(ctx)
}
func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
t.forceMu.Lock()
defer t.forceMu.Unlock()
// handleSignal is the internal logic to convert compactionSignal into compaction tasks.
func (t *compactionTrigger) handleSignal(signal *compactionSignal) error {
log := log.With(zap.Int64("compactionID", signal.id),
zap.Int64("signal.collectionID", signal.collectionID),
zap.Int64("signal.partitionID", signal.partitionID),
zap.Int64("signal.segmentID", signal.segmentID))
filter := SegmentFilterFunc(func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) &&
isFlush(segment) &&
!segment.isCompacting && // not compacting now
!segment.GetIsImporting() && // not importing now
segment.GetLevel() != datapb.SegmentLevel_L0 && // ignore level zero segments
segment.GetLevel() != datapb.SegmentLevel_L2 && // ignore l2 segment
!segment.GetIsInvisible()
}) // partSegments is list of chanPartSegments, which is channel-partition organized segments
zap.Int64s("signal.segmentIDs", signal.segmentIDs))
partSegments := make([]*chanPartSegments, 0)
// get all segments if signal.collection == 0, otherwise get collection segments
if signal.collectionID != 0 {
partSegments = GetSegmentsChanPart(t.meta, signal.collectionID, filter)
} else {
collections := t.meta.GetCollections()
for _, collection := range collections {
partSegments = append(partSegments, GetSegmentsChanPart(t.meta, collection.ID, filter)...)
}
if !signal.isForce && t.compactionHandler.isFull() {
log.Warn("compaction plan skipped due to handler full")
return merr.WrapErrServiceQuotaExceeded("compaction handler full")
}
if len(partSegments) == 0 {
log.Info("the length of SegmentsChanPart is 0, skip to handle compaction")
groups, err := t.getCandidates(signal)
if err != nil {
log.Warn("handle signal failed, get candidates return error", zap.Error(err))
return err
}
if len(groups) == 0 {
log.Info("the length of candidate group is 0, skip to handle signal")
return nil
}
for _, group := range partSegments {
for _, group := range groups {
log := log.With(zap.Int64("collectionID", group.collectionID),
zap.Int64("partitionID", group.partitionID),
zap.String("channel", group.channelName))
@ -405,116 +432,6 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
return nil
}
// handleSignal processes segment flush caused partition-chan level compaction signal
func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
t.forceMu.Lock()
defer t.forceMu.Unlock()
// 1. check whether segment's binlogs should be compacted or not
if t.compactionHandler.isFull() {
log.Warn("compaction plan skipped due to handler full")
return
}
segment := t.meta.GetHealthySegment(context.TODO(), signal.segmentID)
if segment == nil {
log.Warn("segment in compaction signal not found in meta", zap.Int64("segmentID", signal.segmentID))
return
}
channel := segment.GetInsertChannel()
partitionID := segment.GetPartitionID()
collectionID := segment.GetCollectionID()
segments := t.getCandidateSegments(channel, partitionID)
if len(segments) == 0 {
log.Info("the number of candidate segments is 0, skip to handle compaction")
return
}
coll, err := t.getCollection(collectionID)
if err != nil {
log.Warn("get collection info failed, skip handling compaction",
zap.Int64("collectionID", collectionID),
zap.Int64("partitionID", partitionID),
zap.String("channel", channel),
zap.Error(err),
)
return
}
if !signal.isForce && !isCollectionAutoCompactionEnabled(coll) {
log.RatedInfo(20, "collection auto compaction disabled",
zap.Int64("collectionID", collectionID),
)
return
}
ts := tsoutil.ComposeTSByTime(time.Now(), 0)
ct, err := getCompactTime(ts, coll)
if err != nil {
log.Warn("get compact time failed, skip to handle compaction", zap.Int64("collectionID", segment.GetCollectionID()),
zap.Int64("partitionID", partitionID), zap.String("channel", channel))
return
}
expectedSize := getExpectedSegmentSize(t.meta, coll)
plans := t.generatePlans(segments, signal, ct, expectedSize)
for _, plan := range plans {
if t.compactionHandler.isFull() {
log.Warn("compaction plan skipped due to handler full", zap.Int64("collection", signal.collectionID))
break
}
// TODO[GOOSE], 11 = 1 planID + 10 segmentID, this is a hack need to be removed.
// Any plan that output segment number greater than 10 will be marked as invalid plan for now.
startID, endID, err := t.allocator.AllocN(11)
if err != nil {
log.Warn("fail to allocate id", zap.Error(err))
return
}
totalRows, inputSegmentIDs := plan.A, plan.B
start := time.Now()
pts, _ := tsoutil.ParseTS(ct.startTime)
task := &datapb.CompactionTask{
PlanID: startID,
TriggerID: signal.id,
State: datapb.CompactionTaskState_pipelining,
StartTime: pts.Unix(),
TimeoutInSeconds: Params.DataCoordCfg.CompactionTimeoutInSeconds.GetAsInt32(),
Type: datapb.CompactionType_MixCompaction,
CollectionTtl: ct.collectionTTL.Nanoseconds(),
CollectionID: collectionID,
PartitionID: partitionID,
Channel: channel,
InputSegments: inputSegmentIDs,
ResultSegments: []int64{},
TotalRows: totalRows,
Schema: coll.Schema,
MaxSize: getExpandedSize(expectedSize),
PreAllocatedSegmentIDs: &datapb.IDRange{
Begin: startID + 1,
End: endID,
},
}
if err := t.compactionHandler.enqueueCompaction(task); err != nil {
log.Warn("failed to execute compaction task",
zap.Int64("collection", collectionID),
zap.Int64("triggerID", signal.id),
zap.Int64("planID", task.GetPlanID()),
zap.Int64s("inputSegments", inputSegmentIDs),
zap.Error(err))
continue
}
log.Info("time cost of generating compaction",
zap.Int64("planID", task.GetPlanID()),
zap.Int64("time cost", time.Since(start).Milliseconds()),
zap.Int64("collectionID", signal.collectionID),
zap.String("channel", channel),
zap.Int64("partitionID", partitionID),
zap.Int64s("inputSegmentIDs", inputSegmentIDs))
}
}
func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compactionSignal, compactTime *compactTime, expectedSize int64) []*typeutil.Pair[int64, []int64] {
if len(segments) == 0 {
log.Warn("the number of candidate segments is 0, skip to generate compaction plan")
@ -626,28 +543,71 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa
return tasks
}
func (t *compactionTrigger) getCandidateSegments(channel string, partitionID UniqueID) []*SegmentInfo {
segments := t.meta.GetSegmentsByChannel(channel)
if Params.DataCoordCfg.IndexBasedCompaction.GetAsBool() {
segments = FilterInIndexedSegments(t.handler, t.meta, false, segments...)
// getCandidates converts signal criterion into corresponding compaction candidate groups
// since non-major compaction happens under channel+partition level
// the selected segments are grouped into these categories.
func (t *compactionTrigger) getCandidates(signal *compactionSignal) ([]chanPartSegments, error) {
// default filter, select segments which could be compacted
filters := []SegmentFilter{
SegmentFilterFunc(func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) &&
isFlush(segment) &&
!segment.isCompacting && // not compacting now
!segment.GetIsImporting() && // not importing now
segment.GetLevel() != datapb.SegmentLevel_L0 && // ignore level zero segments
segment.GetLevel() != datapb.SegmentLevel_L2 && // ignore l2 segment
!segment.GetIsInvisible()
}),
}
var res []*SegmentInfo
for _, s := range segments {
if !isSegmentHealthy(s) ||
!isFlush(s) ||
s.GetInsertChannel() != channel ||
s.GetPartitionID() != partitionID ||
s.isCompacting ||
s.GetIsImporting() ||
s.GetLevel() == datapb.SegmentLevel_L0 ||
s.GetLevel() == datapb.SegmentLevel_L2 {
continue
// add segment filter if criterion provided
if signal.collectionID > 0 {
filters = append(filters, WithCollection(signal.collectionID))
}
if signal.channel != "" {
filters = append(filters, WithChannel(signal.channel))
}
if signal.partitionID > 0 {
filters = append(filters, SegmentFilterFunc(func(si *SegmentInfo) bool {
return si.GetPartitionID() == signal.partitionID
}))
}
// segment id provided
// select these segments only
if len(signal.segmentIDs) > 0 {
idSet := typeutil.NewSet(signal.segmentIDs...)
filters = append(filters, SegmentFilterFunc(func(si *SegmentInfo) bool {
return idSet.Contain(si.GetID())
}))
}
segments := t.meta.SelectSegments(context.TODO(), filters...)
// some criterion not met or conflicted
if len(signal.segmentIDs) > 0 && len(segments) != len(signal.segmentIDs) {
return nil, merr.WrapErrServiceInternal("not all segment ids provided could be compacted")
}
type category struct {
collectionID int64
partitionID int64
channelName string
}
groups := lo.GroupBy(segments, func(segment *SegmentInfo) category {
return category{
collectionID: segment.CollectionID,
partitionID: segment.PartitionID,
channelName: segment.InsertChannel,
}
res = append(res, s)
}
})
return res
return lo.MapToSlice(groups, func(c category, segments []*SegmentInfo) chanPartSegments {
return chanPartSegments{
collectionID: c.collectionID,
partitionID: c.partitionID,
channelName: c.channelName,
segments: segments,
}
}), nil
}
func (t *compactionTrigger) isSmallSegment(segment *SegmentInfo, expectedSize int64) bool {

View File

@ -180,14 +180,21 @@ func Test_compactionTrigger_force_without_index(t *testing.T) {
meta: m,
handler: newMockHandlerWithMeta(m),
allocator: newMock0Allocator(t),
signals: nil,
signals: make(chan *compactionSignal, 100),
manualSignals: make(chan *compactionSignal, 100),
compactionHandler: compactionHandler,
globalTrigger: nil,
closeCh: lifetime.NewSafeChan(),
testingOnly: true,
}
tr.closeWaiter.Add(1)
go func() {
defer tr.closeWaiter.Done()
tr.work()
}()
defer tr.stop()
_, err := tr.triggerManualCompaction(collectionID)
_, err := tr.TriggerCompaction(context.TODO(), NewCompactionSignal().WithCollectionID(collectionID).WithIsForce(true))
assert.NoError(t, err)
select {
@ -628,7 +635,8 @@ func Test_compactionTrigger_force(t *testing.T) {
meta: tt.fields.meta,
handler: newMockHandlerWithMeta(tt.fields.meta),
allocator: tt.fields.allocator,
signals: tt.fields.signals,
signals: make(chan *compactionSignal, 100),
manualSignals: make(chan *compactionSignal, 100),
compactionHandler: tt.fields.compactionHandler,
globalTrigger: tt.fields.globalTrigger,
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
@ -636,7 +644,13 @@ func Test_compactionTrigger_force(t *testing.T) {
closeCh: lifetime.NewSafeChan(),
testingOnly: true,
}
_, err := tr.triggerManualCompaction(tt.collectionID)
tr.closeWaiter.Add(1)
go func() {
defer tr.closeWaiter.Done()
tr.work()
}()
defer tr.stop()
_, err := tr.TriggerCompaction(context.TODO(), NewCompactionSignal().WithCollectionID(tt.collectionID).WithIsForce(true))
assert.Equal(t, tt.wantErr, err != nil)
spy := (tt.fields.compactionHandler).(*spyCompactionHandler)
select {
@ -666,7 +680,8 @@ func Test_compactionTrigger_force(t *testing.T) {
meta: tt.fields.meta,
handler: newMockHandlerWithMeta(tt.fields.meta),
allocator: tt.fields.allocator,
signals: tt.fields.signals,
signals: make(chan *compactionSignal, 100),
manualSignals: make(chan *compactionSignal, 100),
compactionHandler: tt.fields.compactionHandler,
globalTrigger: tt.fields.globalTrigger,
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
@ -674,8 +689,14 @@ func Test_compactionTrigger_force(t *testing.T) {
closeCh: lifetime.NewSafeChan(),
testingOnly: true,
}
tr.closeWaiter.Add(1)
go func() {
defer tr.closeWaiter.Done()
tr.work()
}()
defer tr.stop()
tt.collectionID = 1000
_, err := tr.triggerManualCompaction(tt.collectionID)
_, err := tr.TriggerCompaction(context.TODO(), NewCompactionSignal().WithCollectionID(tt.collectionID).WithIsForce(true))
assert.Equal(t, tt.wantErr, err != nil)
// expect max row num = 2048*1024*1024/(128*4) = 4194304
// assert.EqualValues(t, 4194304, tt.fields.meta.segments.GetSegments()[0].MaxRowNum)
@ -698,7 +719,8 @@ func Test_compactionTrigger_force(t *testing.T) {
meta: tt.fields.meta,
handler: newMockHandlerWithMeta(tt.fields.meta),
allocator: tt.fields.allocator,
signals: tt.fields.signals,
signals: make(chan *compactionSignal, 100),
manualSignals: make(chan *compactionSignal, 100),
compactionHandler: tt.fields.compactionHandler,
globalTrigger: tt.fields.globalTrigger,
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
@ -706,16 +728,17 @@ func Test_compactionTrigger_force(t *testing.T) {
closeCh: lifetime.NewSafeChan(),
testingOnly: true,
}
tr.closeWaiter.Add(1)
go func() {
defer tr.closeWaiter.Done()
tr.work()
}()
defer tr.stop()
{
// test getCompactTime fail for handle global signal
signal := &compactionSignal{
id: 0,
isForce: true,
isGlobal: true,
collectionID: 1111,
}
tr.handleGlobalSignal(signal)
signal := NewCompactionSignal().WithCollectionID(1111).WithIsForce(true)
tr.TriggerCompaction(context.TODO(), signal)
spy := (tt.fields.compactionHandler).(*spyCompactionHandler)
hasPlan := true
@ -730,13 +753,8 @@ func Test_compactionTrigger_force(t *testing.T) {
{
// test getCompactTime fail for handle signal
signal := &compactionSignal{
id: 0,
isForce: true,
collectionID: 1111,
segmentID: 3,
}
tr.handleSignal(signal)
signal := NewCompactionSignal().WithCollectionID(1111).WithIsForce(true).WithSegmentIDs(3)
tr.TriggerCompaction(context.TODO(), signal)
spy := (tt.fields.compactionHandler).(*spyCompactionHandler)
hasPlan := true
@ -951,7 +969,8 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
meta: tt.fields.meta,
handler: newMockHandlerWithMeta(tt.fields.meta),
allocator: tt.fields.allocator,
signals: tt.fields.signals,
signals: make(chan *compactionSignal, 100),
manualSignals: make(chan *compactionSignal, 100),
compactionHandler: tt.fields.compactionHandler,
globalTrigger: tt.fields.globalTrigger,
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
@ -959,7 +978,13 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
closeCh: lifetime.NewSafeChan(),
testingOnly: true,
}
_, err := tr.triggerManualCompaction(tt.args.collectionID)
tr.closeWaiter.Add(1)
go func() {
defer tr.closeWaiter.Done()
tr.work()
}()
defer tr.stop()
_, err := tr.TriggerCompaction(context.TODO(), NewCompactionSignal().WithCollectionID(tt.args.collectionID).WithIsForce(true))
assert.Equal(t, tt.wantErr, err != nil)
spy := (tt.fields.compactionHandler).(*spyCompactionHandler)
@ -1106,7 +1131,7 @@ func Test_compactionTrigger_noplan(t *testing.T) {
meta: tt.fields.meta,
handler: newMockHandlerWithMeta(tt.fields.meta),
allocator: tt.fields.allocator,
signals: tt.fields.signals,
signals: make(chan *compactionSignal, 100),
compactionHandler: tt.fields.compactionHandler,
globalTrigger: tt.fields.globalTrigger,
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
@ -1116,7 +1141,7 @@ func Test_compactionTrigger_noplan(t *testing.T) {
}
tr.start()
defer tr.stop()
err := tr.triggerCompaction()
_, err := tr.TriggerCompaction(context.TODO(), NewCompactionSignal())
assert.Equal(t, tt.wantErr, err != nil)
spy := (tt.fields.compactionHandler).(*spyCompactionHandler)
select {
@ -1301,7 +1326,7 @@ func Test_compactionTrigger_PrioritizedCandi(t *testing.T) {
meta: tt.fields.meta,
handler: newMockHandlerWithMeta(tt.fields.meta),
allocator: tt.fields.allocator,
signals: tt.fields.signals,
signals: make(chan *compactionSignal, 100),
compactionHandler: tt.fields.compactionHandler,
globalTrigger: tt.fields.globalTrigger,
closeCh: lifetime.NewSafeChan(),
@ -1309,7 +1334,7 @@ func Test_compactionTrigger_PrioritizedCandi(t *testing.T) {
}
tr.start()
defer tr.stop()
err := tr.triggerCompaction()
_, err := tr.TriggerCompaction(context.TODO(), NewCompactionSignal())
assert.Equal(t, tt.wantErr, err != nil)
spy := (tt.fields.compactionHandler).(*spyCompactionHandler)
select {
@ -1444,7 +1469,7 @@ func Test_compactionTrigger_SmallCandi(t *testing.T) {
meta: tt.fields.meta,
handler: newMockHandlerWithMeta(tt.fields.meta),
allocator: tt.fields.allocator,
signals: tt.fields.signals,
signals: make(chan *compactionSignal, 100),
compactionHandler: tt.fields.compactionHandler,
globalTrigger: tt.fields.globalTrigger,
indexEngineVersionManager: newMockVersionManager(),
@ -1455,7 +1480,7 @@ func Test_compactionTrigger_SmallCandi(t *testing.T) {
}
tr.start()
defer tr.stop()
err := tr.triggerCompaction()
_, err := tr.TriggerCompaction(context.TODO(), NewCompactionSignal())
assert.Equal(t, tt.wantErr, err != nil)
spy := (tt.fields.compactionHandler).(*spyCompactionHandler)
select {
@ -1587,7 +1612,7 @@ func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) {
meta: tt.fields.meta,
handler: newMockHandlerWithMeta(tt.fields.meta),
allocator: tt.fields.allocator,
signals: tt.fields.signals,
signals: make(chan *compactionSignal, 100),
compactionHandler: tt.fields.compactionHandler,
globalTrigger: tt.fields.globalTrigger,
indexEngineVersionManager: newMockVersionManager(),
@ -1598,7 +1623,7 @@ func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) {
}
tr.start()
defer tr.stop()
err := tr.triggerCompaction()
_, err := tr.TriggerCompaction(context.TODO(), NewCompactionSignal())
assert.Equal(t, tt.wantErr, err != nil)
spy := (tt.fields.compactionHandler).(*spyCompactionHandler)
select {
@ -1774,7 +1799,7 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) {
meta: tt.fields.meta,
handler: newMockHandlerWithMeta(tt.fields.meta),
allocator: tt.fields.allocator,
signals: tt.fields.signals,
signals: make(chan *compactionSignal, 100),
compactionHandler: tt.fields.compactionHandler,
globalTrigger: tt.fields.globalTrigger,
indexEngineVersionManager: newMockVersionManager(),
@ -1783,7 +1808,7 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) {
}
tr.start()
defer tr.stop()
err := tr.triggerCompaction()
_, err := tr.TriggerCompaction(context.TODO(), NewCompactionSignal())
assert.Equal(t, tt.wantErr, err != nil)
spy := (tt.fields.compactionHandler).(*spyCompactionHandler)
@ -2077,7 +2102,7 @@ func Test_compactionTrigger_getCompactTime(t *testing.T) {
assert.NotNil(t, ct)
}
func Test_triggerSingleCompaction(t *testing.T) {
func Test_TirggerCompaction_WaitResult(t *testing.T) {
originValue := Params.DataCoordCfg.EnableAutoCompaction.GetValue()
Params.Save(Params.DataCoordCfg.EnableAutoCompaction.Key, "true")
defer func() {
@ -2095,12 +2120,22 @@ func Test_triggerSingleCompaction(t *testing.T) {
}, newMockVersionManager())
got.signals = make(chan *compactionSignal, 1)
{
err := got.triggerSingleCompaction(1, 1, 1, "a", false)
_, err := got.TriggerCompaction(context.TODO(), NewCompactionSignal().
WithCollectionID(1).
WithPartitionID(1).
WithSegmentIDs(1).
WithChannel("a").
WithWaitResult(false))
assert.NoError(t, err)
}
{
err := got.triggerSingleCompaction(2, 2, 2, "b", false)
assert.NoError(t, err)
_, err := got.TriggerCompaction(context.TODO(), NewCompactionSignal().
WithCollectionID(2).
WithPartitionID(2).
WithSegmentIDs(2).
WithChannel("b").
WithWaitResult(false))
assert.Error(t, err)
}
var i satomic.Value
i.Store(0)
@ -2119,10 +2154,6 @@ func Test_triggerSingleCompaction(t *testing.T) {
check()
assert.Equal(t, 1, i.Load().(int))
{
err := got.triggerSingleCompaction(3, 3, 3, "c", true)
assert.NoError(t, err)
}
var j satomic.Value
j.Store(0)
go func() {
@ -2138,13 +2169,30 @@ func Test_triggerSingleCompaction(t *testing.T) {
} else if x == 1 {
assert.EqualValues(t, 4, signal.collectionID)
}
signal.Notify(nil)
case <-timeoutCtx.Done():
return
}
}
}()
{
err := got.triggerSingleCompaction(4, 4, 4, "d", true)
_, err := got.TriggerCompaction(context.TODO(), NewCompactionSignal().
WithCollectionID(3).
WithPartitionID(3).
WithSegmentIDs(3).
WithChannel("c").
WithWaitResult(false))
assert.NoError(t, err)
}
{
_, err := got.TriggerCompaction(context.TODO(), NewCompactionSignal().
WithCollectionID(4).
WithPartitionID(4).
WithSegmentIDs(4).
WithChannel("d").
WithWaitResult(true))
assert.NoError(t, err)
}
assert.Eventually(t, func() bool {
@ -2354,7 +2402,6 @@ func (s *CompactionTriggerSuite) TestHandleSignal() {
// s.allocator.EXPECT().AllocTimestamp(mock.Anything).Return(10000, nil)
s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(nil, errors.New("mocked"))
tr.handleSignal(&compactionSignal{
segmentID: 1,
collectionID: s.collectionID,
partitionID: s.partitionID,
channel: s.channel,
@ -2383,7 +2430,6 @@ func (s *CompactionTriggerSuite) TestHandleSignal() {
},
}, nil)
tr.handleSignal(&compactionSignal{
segmentID: 1,
collectionID: s.collectionID,
partitionID: s.partitionID,
channel: s.channel,
@ -2413,7 +2459,6 @@ func (s *CompactionTriggerSuite) TestHandleSignal() {
},
}, nil)
tr.handleSignal(&compactionSignal{
segmentID: 1,
collectionID: s.collectionID,
partitionID: s.partitionID,
channel: s.channel,
@ -2449,7 +2494,6 @@ func (s *CompactionTriggerSuite) TestHandleSignal() {
}, nil)
s.compactionHandler.EXPECT().enqueueCompaction(mock.Anything).Return(nil)
tr.handleSignal(&compactionSignal{
segmentID: 1,
collectionID: s.collectionID,
partitionID: s.partitionID,
channel: s.channel,
@ -2483,21 +2527,17 @@ func (s *CompactionTriggerSuite) TestHandleGlobalSignal() {
},
},
}
s.Run("getCompaction_failed", func() {
s.Run("GetCollection_failed", func() {
defer s.SetupTest()
tr := s.tr
s.compactionHandler.EXPECT().isFull().Return(false)
// s.allocator.EXPECT().AllocTimestamp(mock.Anything).Return(10000, nil)
s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(nil, errors.New("mocked"))
tr.handleGlobalSignal(&compactionSignal{
segmentID: 1,
collectionID: s.collectionID,
partitionID: s.partitionID,
channel: s.channel,
isForce: false,
})
// suite shall check compactionHandler.enqueueCompaction never called
err := tr.handleSignal(NewCompactionSignal().
WithCollectionID(s.collectionID).
WithPartitionID(s.partitionID).
WithChannel(s.channel))
s.Error(err)
})
s.Run("collectionAutoCompactionConfigError", func() {
@ -2511,45 +2551,36 @@ func (s *CompactionTriggerSuite) TestHandleGlobalSignal() {
common.CollectionAutoCompactionKey: "bad_value",
},
}, nil)
tr.handleGlobalSignal(&compactionSignal{
segmentID: 1,
collectionID: s.collectionID,
partitionID: s.partitionID,
channel: s.channel,
isForce: false,
})
// suite shall check compactionHandler.enqueueCompaction never called
s.NotPanics(func() {
err := tr.handleSignal(NewCompactionSignal().
WithCollectionID(s.collectionID).
WithPartitionID(s.partitionID).
WithChannel(s.channel))
s.NoError(err)
}, "bad configuration shall not cause panicking")
})
s.Run("collectionAutoCompactionDisabled", func() {
defer s.SetupTest()
tr := s.tr
s.compactionHandler.EXPECT().isFull().Return(false)
// s.allocator.EXPECT().AllocTimestamp(mock.Anything).Return(10000, nil)
s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(&collectionInfo{
Schema: schema,
Properties: map[string]string{
common.CollectionAutoCompactionKey: "false",
},
}, nil)
tr.handleGlobalSignal(&compactionSignal{
segmentID: 1,
collectionID: s.collectionID,
partitionID: s.partitionID,
channel: s.channel,
isForce: false,
})
// suite shall check compactionHandler.enqueueCompaction never called
err := tr.handleSignal(NewCompactionSignal().
WithCollectionID(s.collectionID).
WithPartitionID(s.partitionID).
WithChannel(s.channel))
s.NoError(err)
})
s.Run("collectionAutoCompactionDisabled_force", func() {
defer s.SetupTest()
tr := s.tr
// s.compactionHandler.EXPECT().isFull().Return(false)
// s.allocator.EXPECT().AllocTimestamp(mock.Anything).Return(10000, nil)
// s.allocator.EXPECT().AllocID(mock.Anything).Return(20000, nil).Maybe()
s.compactionHandler.EXPECT().isFull().Return(false)
start := int64(20000)
s.allocator.EXPECT().AllocN(mock.Anything).RunAndReturn(func(i int64) (int64, int64, error) {
return start, start + i, nil
@ -2561,14 +2592,11 @@ func (s *CompactionTriggerSuite) TestHandleGlobalSignal() {
common.CollectionAutoCompactionKey: "false",
},
}, nil)
// s.compactionHandler.EXPECT().enqueueCompaction(mock.Anything).Return(nil)
tr.handleGlobalSignal(&compactionSignal{
segmentID: 1,
collectionID: s.collectionID,
partitionID: s.partitionID,
channel: s.channel,
isForce: true,
})
err := tr.handleSignal(NewCompactionSignal().
WithCollectionID(s.collectionID).
WithPartitionID(s.partitionID).
WithChannel(s.channel))
s.NoError(err)
})
}

View File

@ -0,0 +1,157 @@
// Code generated by mockery v2.46.0. DO NOT EDIT.
package datacoord
import (
context "context"
mock "github.com/stretchr/testify/mock"
)
// MockTrigger is an autogenerated mock type for the trigger type
type MockTrigger struct {
mock.Mock
}
type MockTrigger_Expecter struct {
mock *mock.Mock
}
func (_m *MockTrigger) EXPECT() *MockTrigger_Expecter {
return &MockTrigger_Expecter{mock: &_m.Mock}
}
// TriggerCompaction provides a mock function with given fields: ctx, signal
func (_m *MockTrigger) TriggerCompaction(ctx context.Context, signal *compactionSignal) (int64, error) {
ret := _m.Called(ctx, signal)
if len(ret) == 0 {
panic("no return value specified for TriggerCompaction")
}
var r0 int64
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *compactionSignal) (int64, error)); ok {
return rf(ctx, signal)
}
if rf, ok := ret.Get(0).(func(context.Context, *compactionSignal) int64); ok {
r0 = rf(ctx, signal)
} else {
r0 = ret.Get(0).(int64)
}
if rf, ok := ret.Get(1).(func(context.Context, *compactionSignal) error); ok {
r1 = rf(ctx, signal)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockTrigger_TriggerCompaction_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'TriggerCompaction'
type MockTrigger_TriggerCompaction_Call struct {
*mock.Call
}
// TriggerCompaction is a helper method to define mock.On call
// - ctx context.Context
// - signal *compactionSignal
func (_e *MockTrigger_Expecter) TriggerCompaction(ctx interface{}, signal interface{}) *MockTrigger_TriggerCompaction_Call {
return &MockTrigger_TriggerCompaction_Call{Call: _e.mock.On("TriggerCompaction", ctx, signal)}
}
func (_c *MockTrigger_TriggerCompaction_Call) Run(run func(ctx context.Context, signal *compactionSignal)) *MockTrigger_TriggerCompaction_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*compactionSignal))
})
return _c
}
func (_c *MockTrigger_TriggerCompaction_Call) Return(signalID int64, err error) *MockTrigger_TriggerCompaction_Call {
_c.Call.Return(signalID, err)
return _c
}
func (_c *MockTrigger_TriggerCompaction_Call) RunAndReturn(run func(context.Context, *compactionSignal) (int64, error)) *MockTrigger_TriggerCompaction_Call {
_c.Call.Return(run)
return _c
}
// start provides a mock function with given fields:
func (_m *MockTrigger) start() {
_m.Called()
}
// MockTrigger_start_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'start'
type MockTrigger_start_Call struct {
*mock.Call
}
// start is a helper method to define mock.On call
func (_e *MockTrigger_Expecter) start() *MockTrigger_start_Call {
return &MockTrigger_start_Call{Call: _e.mock.On("start")}
}
func (_c *MockTrigger_start_Call) Run(run func()) *MockTrigger_start_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockTrigger_start_Call) Return() *MockTrigger_start_Call {
_c.Call.Return()
return _c
}
func (_c *MockTrigger_start_Call) RunAndReturn(run func()) *MockTrigger_start_Call {
_c.Call.Return(run)
return _c
}
// stop provides a mock function with given fields:
func (_m *MockTrigger) stop() {
_m.Called()
}
// MockTrigger_stop_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'stop'
type MockTrigger_stop_Call struct {
*mock.Call
}
// stop is a helper method to define mock.On call
func (_e *MockTrigger_Expecter) stop() *MockTrigger_stop_Call {
return &MockTrigger_stop_Call{Call: _e.mock.On("stop")}
}
func (_c *MockTrigger_stop_Call) Run(run func()) *MockTrigger_stop_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockTrigger_stop_Call) Return() *MockTrigger_stop_Call {
_c.Call.Return()
return _c
}
func (_c *MockTrigger_stop_Call) RunAndReturn(run func()) *MockTrigger_stop_Call {
_c.Call.Return(run)
return _c
}
// NewMockTrigger creates a new instance of MockTrigger. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockTrigger(t interface {
mock.TestingT
Cleanup(func())
}) *MockTrigger {
mock := &MockTrigger{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -1724,13 +1724,9 @@ func TestManualCompaction(t *testing.T) {
t.Run("test manual compaction successfully", func(t *testing.T) {
svr := &Server{allocator: allocator.NewMockAllocator(t)}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.compactionTrigger = &mockCompactionTrigger{
methods: map[string]interface{}{
"triggerManualCompaction": func(collectionID int64) (UniqueID, error) {
return 1, nil
},
},
}
mockTrigger := NewMockTrigger(t)
svr.compactionTrigger = mockTrigger
mockTrigger.EXPECT().TriggerCompaction(mock.Anything, mock.Anything).Return(1, nil)
mockHandler := NewMockCompactionPlanContext(t)
mockHandler.EXPECT().getCompactionTasksNumBySignalID(mock.Anything).Return(1)
@ -1746,13 +1742,10 @@ func TestManualCompaction(t *testing.T) {
t.Run("test manual compaction failure", func(t *testing.T) {
svr := &Server{allocator: allocator.NewMockAllocator(t)}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.compactionTrigger = &mockCompactionTrigger{
methods: map[string]interface{}{
"triggerManualCompaction": func(collectionID int64) (UniqueID, error) {
return 0, errors.New("mock error")
},
},
}
mockTrigger := NewMockTrigger(t)
svr.compactionTrigger = mockTrigger
mockTrigger.EXPECT().TriggerCompaction(mock.Anything, mock.Anything).Return(0, errors.New("mock error"))
resp, err := svr.ManualCompaction(context.TODO(), &milvuspb.ManualCompactionRequest{
CollectionID: 1,
Timetravel: 1,
@ -1764,13 +1757,9 @@ func TestManualCompaction(t *testing.T) {
t.Run("test manual compaction with closed server", func(t *testing.T) {
svr := &Server{}
svr.stateCode.Store(commonpb.StateCode_Abnormal)
svr.compactionTrigger = &mockCompactionTrigger{
methods: map[string]interface{}{
"triggerManualCompaction": func(collectionID int64) (UniqueID, error) {
return 1, nil
},
},
}
mockTrigger := NewMockTrigger(t)
svr.compactionTrigger = mockTrigger
mockTrigger.EXPECT().TriggerCompaction(mock.Anything, mock.Anything).Return(1, nil).Maybe()
resp, err := svr.ManualCompaction(context.TODO(), &milvuspb.ManualCompactionRequest{
CollectionID: 1,

View File

@ -587,8 +587,12 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
s.flushCh <- req.SegmentID
// notify compaction
err := s.compactionTrigger.triggerSingleCompaction(req.GetCollectionID(), req.GetPartitionID(),
req.GetSegmentID(), req.GetChannel(), false)
_, err := s.compactionTrigger.TriggerCompaction(ctx,
NewCompactionSignal().
WithWaitResult(false).
WithCollectionID(req.GetCollectionID()).
WithPartitionID(req.GetPartitionID()).
WithChannel(req.GetChannel()))
if err != nil {
log.Warn("failed to trigger single compaction")
}
@ -1157,7 +1161,13 @@ func (s *Server) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompa
if req.MajorCompaction {
id, err = s.compactionTriggerManager.ManualTrigger(ctx, req.CollectionID, req.GetMajorCompaction())
} else {
id, err = s.compactionTrigger.triggerManualCompaction(req.CollectionID)
id, err = s.compactionTrigger.TriggerCompaction(ctx, NewCompactionSignal().
WithIsForce(true).
WithCollectionID(req.GetCollectionID()).
WithPartitionID(req.GetPartitionId()).
WithChannel(req.GetChannel()).
WithSegmentIDs(req.GetSegmentIds()...),
)
}
if err != nil {
log.Error("failed to trigger manual compaction", zap.Error(err))

View File

@ -14,7 +14,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/json-iterator/go v1.1.12
github.com/klauspost/compress v1.17.7
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.8-0.20250319131803-68e8b224752b
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.8-0.20250326031212-c94bad0122dd
github.com/nats-io/nats-server/v2 v2.10.12
github.com/nats-io/nats.go v1.34.1
github.com/panjf2000/ants/v2 v2.7.2

View File

@ -488,8 +488,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.8-0.20250319131803-68e8b224752b h1:vCTi6z+V28LJHeY1X7Qwz22A58tza9ZzfbMwEpHlDU4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.8-0.20250319131803-68e8b224752b/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.8-0.20250326031212-c94bad0122dd h1:Vd+RIXg+yfl5w72LKWzPeNZH/c8cDO4K54Lf07B39+E=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.8-0.20250326031212-c94bad0122dd/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE=
github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=

View File

@ -8,6 +8,7 @@ require (
github.com/milvus-io/milvus/client/v2 v2.0.0-20241125024034-0b9edb62a92d
github.com/milvus-io/milvus/pkg/v2 v2.5.7
github.com/quasilyte/go-ruleguard/dsl v0.3.22
github.com/samber/lo v1.27.0
github.com/stretchr/testify v1.9.0
github.com/x448/float16 v0.8.4
go.uber.org/zap v1.27.0
@ -52,7 +53,7 @@ require (
github.com/kr/text v0.2.0 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.7 // indirect
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.8-0.20250326031212-c94bad0122dd // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/opencontainers/runtime-spec v1.0.2 // indirect
@ -65,7 +66,6 @@ require (
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/samber/lo v1.27.0 // indirect
github.com/shirou/gopsutil/v3 v3.22.9 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/soheilhy/cmux v0.1.5 // indirect

View File

@ -318,8 +318,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfr
github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.7 h1:trg9Lri1K2JxluLXK7AR4iCLfXucPhWPVwAF6eMXNrg=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.7/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.8-0.20250326031212-c94bad0122dd h1:Vd+RIXg+yfl5w72LKWzPeNZH/c8cDO4K54Lf07B39+E=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.8-0.20250326031212-c94bad0122dd/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus/pkg/v2 v2.5.7 h1:b45jq1s1v03AekFucs2/dkkXohB57gEx7gspJuAkfbY=
github.com/milvus-io/milvus/pkg/v2 v2.5.7/go.mod h1:pImw1IGNS7k/5yvlZV2tZi5vZu1VQRlQij+r39d+XnI=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=