mirror of https://github.com/milvus-io/milvus.git
enhance:use proto.clone in compaction tasks to prevent omissions (#36624)
issue: #36623 Signed-off-by: wayblink <anyang.wang@zilliz.com>pull/36721/head
parent
1f47d5510b
commit
c7a81d5de3
|
@ -23,18 +23,18 @@ import (
|
|||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/samber/lo"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/datacoord/allocator"
|
||||
"github.com/milvus-io/milvus/internal/datacoord/session"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
|
@ -582,33 +582,7 @@ func (t *clusteringCompactionTask) doCompact() error {
|
|||
}
|
||||
|
||||
func (t *clusteringCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.CompactionTask {
|
||||
taskClone := &datapb.CompactionTask{
|
||||
PlanID: t.GetPlanID(),
|
||||
TriggerID: t.GetTriggerID(),
|
||||
State: t.GetState(),
|
||||
StartTime: t.GetStartTime(),
|
||||
EndTime: t.GetEndTime(),
|
||||
TimeoutInSeconds: t.GetTimeoutInSeconds(),
|
||||
Type: t.GetType(),
|
||||
CollectionTtl: t.CollectionTtl,
|
||||
CollectionID: t.GetCollectionID(),
|
||||
PartitionID: t.GetPartitionID(),
|
||||
Channel: t.GetChannel(),
|
||||
InputSegments: t.GetInputSegments(),
|
||||
ResultSegments: t.GetResultSegments(),
|
||||
TotalRows: t.TotalRows,
|
||||
Schema: t.Schema,
|
||||
NodeID: t.GetNodeID(),
|
||||
FailReason: t.GetFailReason(),
|
||||
RetryTimes: t.GetRetryTimes(),
|
||||
Pos: t.GetPos(),
|
||||
ClusteringKeyField: t.GetClusteringKeyField(),
|
||||
MaxSegmentRows: t.GetMaxSegmentRows(),
|
||||
PreferSegmentRows: t.GetPreferSegmentRows(),
|
||||
AnalyzeTaskID: t.GetAnalyzeTaskID(),
|
||||
AnalyzeVersion: t.GetAnalyzeVersion(),
|
||||
LastStateStartTime: t.GetLastStateStartTime(),
|
||||
}
|
||||
taskClone := proto.Clone(t).(*datapb.CompactionTask)
|
||||
for _, opt := range opts {
|
||||
opt(taskClone)
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
"github.com/samber/lo"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/datacoord/allocator"
|
||||
|
@ -266,27 +267,7 @@ func (t *l0CompactionTask) CleanLogPath() {
|
|||
}
|
||||
|
||||
func (t *l0CompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.CompactionTask {
|
||||
taskClone := &datapb.CompactionTask{
|
||||
PlanID: t.GetPlanID(),
|
||||
TriggerID: t.GetTriggerID(),
|
||||
State: t.GetState(),
|
||||
StartTime: t.GetStartTime(),
|
||||
EndTime: t.GetEndTime(),
|
||||
TimeoutInSeconds: t.GetTimeoutInSeconds(),
|
||||
Type: t.GetType(),
|
||||
CollectionTtl: t.CollectionTtl,
|
||||
CollectionID: t.GetCollectionID(),
|
||||
PartitionID: t.GetPartitionID(),
|
||||
Channel: t.GetChannel(),
|
||||
InputSegments: t.GetInputSegments(),
|
||||
ResultSegments: t.GetResultSegments(),
|
||||
TotalRows: t.TotalRows,
|
||||
Schema: t.Schema,
|
||||
NodeID: t.GetNodeID(),
|
||||
FailReason: t.GetFailReason(),
|
||||
RetryTimes: t.GetRetryTimes(),
|
||||
Pos: t.GetPos(),
|
||||
}
|
||||
taskClone := proto.Clone(t).(*datapb.CompactionTask)
|
||||
for _, opt := range opts {
|
||||
opt(taskClone)
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"github.com/samber/lo"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/datacoord/allocator"
|
||||
"github.com/milvus-io/milvus/internal/datacoord/session"
|
||||
|
@ -222,28 +223,7 @@ func (t *mixCompactionTask) processTimeout() bool {
|
|||
}
|
||||
|
||||
func (t *mixCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.CompactionTask {
|
||||
taskClone := &datapb.CompactionTask{
|
||||
PlanID: t.GetPlanID(),
|
||||
TriggerID: t.GetTriggerID(),
|
||||
State: t.GetState(),
|
||||
StartTime: t.GetStartTime(),
|
||||
EndTime: t.GetEndTime(),
|
||||
TimeoutInSeconds: t.GetTimeoutInSeconds(),
|
||||
Type: t.GetType(),
|
||||
CollectionTtl: t.CollectionTtl,
|
||||
CollectionID: t.GetCollectionID(),
|
||||
PartitionID: t.GetPartitionID(),
|
||||
Channel: t.GetChannel(),
|
||||
InputSegments: t.GetInputSegments(),
|
||||
ResultSegments: t.GetResultSegments(),
|
||||
TotalRows: t.TotalRows,
|
||||
Schema: t.Schema,
|
||||
NodeID: t.GetNodeID(),
|
||||
FailReason: t.GetFailReason(),
|
||||
RetryTimes: t.GetRetryTimes(),
|
||||
Pos: t.GetPos(),
|
||||
MaxSize: t.GetMaxSize(),
|
||||
}
|
||||
taskClone := proto.Clone(t).(*datapb.CompactionTask)
|
||||
for _, opt := range opts {
|
||||
opt(taskClone)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue