mirror of https://github.com/milvus-io/milvus.git
fix: add filter to exclude L0 import jobs in compaction trigger (#40045)
- issue: #39849 Signed-off-by: SimFG <bang.fu@zilliz.com>pull/40083/head
parent
f0d3d98c3f
commit
b562f8e644
|
@ -380,7 +380,11 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context,
|
|||
|
||||
func (m *CompactionTriggerManager) addL0ImportTaskForImport(ctx context.Context, collection *collectionInfo, view CompactionView) error {
|
||||
// add l0 import task for the collection if the collection is importing
|
||||
importJobs := m.imeta.GetJobBy(ctx, WithCollectionID(collection.ID), WithoutJobStates(internalpb.ImportJobState_Completed, internalpb.ImportJobState_Failed))
|
||||
importJobs := m.imeta.GetJobBy(ctx,
|
||||
WithCollectionID(collection.ID),
|
||||
WithoutJobStates(internalpb.ImportJobState_Completed, internalpb.ImportJobState_Failed),
|
||||
WithoutL0Job(),
|
||||
)
|
||||
if len(importJobs) > 0 {
|
||||
partitionID := view.GetGroupLabel().PartitionID
|
||||
var (
|
||||
|
@ -404,6 +408,9 @@ func (m *CompactionTriggerManager) addL0ImportTaskForImport(ctx context.Context,
|
|||
}
|
||||
}
|
||||
}
|
||||
if len(importPaths) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
for i, job := range importJobs {
|
||||
newTasks, err := NewImportTasks([][]*datapb.ImportFileStats{
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/util/importutilv2"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/proto/internalpb"
|
||||
|
@ -61,6 +62,12 @@ func WithoutJobStates(states ...internalpb.ImportJobState) ImportJobFilter {
|
|||
}
|
||||
}
|
||||
|
||||
func WithoutL0Job() ImportJobFilter {
|
||||
return func(job ImportJob) bool {
|
||||
return !importutilv2.IsL0Import(job.GetOptions())
|
||||
}
|
||||
}
|
||||
|
||||
type UpdateJobAction func(job ImportJob)
|
||||
|
||||
func UpdateJobState(state internalpb.ImportJobState) UpdateJobAction {
|
||||
|
|
Loading…
Reference in New Issue