fix: Refine compactionTask to avoid data race (#36936)

issue: #36897

Signed-off-by: wayblink <anyang.wang@zilliz.com>
pull/36951/head
wayblink 2024-10-24 09:55:28 +08:00 committed by GitHub
parent 92361c4efc
commit 49b562207c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 1083 additions and 1273 deletions

View File

@ -163,13 +163,13 @@ func summaryCompactionState(tasks []*datapb.CompactionTask) *compactionInfo {
func (c *compactionPlanHandler) getCompactionTasksNumBySignalID(triggerID int64) int {
cnt := 0
c.queueTasks.ForEach(func(ct CompactionTask) {
if ct.GetTriggerID() == triggerID {
if ct.GetTaskProto().GetTriggerID() == triggerID {
cnt += 1
}
})
c.executingGuard.RLock()
for _, t := range c.executingTasks {
if t.GetTriggerID() == triggerID {
if t.GetTaskProto().GetTriggerID() == triggerID {
cnt += 1
}
}
@ -203,14 +203,14 @@ func (c *compactionPlanHandler) schedule() []CompactionTask {
c.executingGuard.RLock()
for _, t := range c.executingTasks {
switch t.GetType() {
switch t.GetTaskProto().GetType() {
case datapb.CompactionType_Level0DeleteCompaction:
l0ChannelExcludes.Insert(t.GetChannel())
l0ChannelExcludes.Insert(t.GetTaskProto().GetChannel())
case datapb.CompactionType_MixCompaction:
mixChannelExcludes.Insert(t.GetChannel())
mixChannelExcludes.Insert(t.GetTaskProto().GetChannel())
mixLabelExcludes.Insert(t.GetLabel())
case datapb.CompactionType_ClusteringCompaction:
clusterChannelExcludes.Insert(t.GetChannel())
clusterChannelExcludes.Insert(t.GetTaskProto().GetChannel())
clusterLabelExcludes.Insert(t.GetLabel())
}
}
@ -240,39 +240,39 @@ func (c *compactionPlanHandler) schedule() []CompactionTask {
return selected
}
switch t.GetType() {
switch t.GetTaskProto().GetType() {
case datapb.CompactionType_Level0DeleteCompaction:
if mixChannelExcludes.Contain(t.GetChannel()) {
if mixChannelExcludes.Contain(t.GetTaskProto().GetChannel()) {
excluded = append(excluded, t)
continue
}
l0ChannelExcludes.Insert(t.GetChannel())
l0ChannelExcludes.Insert(t.GetTaskProto().GetChannel())
selected = append(selected, t)
case datapb.CompactionType_MixCompaction:
if l0ChannelExcludes.Contain(t.GetChannel()) {
if l0ChannelExcludes.Contain(t.GetTaskProto().GetChannel()) {
excluded = append(excluded, t)
continue
}
mixChannelExcludes.Insert(t.GetChannel())
mixChannelExcludes.Insert(t.GetTaskProto().GetChannel())
mixLabelExcludes.Insert(t.GetLabel())
selected = append(selected, t)
case datapb.CompactionType_ClusteringCompaction:
if l0ChannelExcludes.Contain(t.GetChannel()) ||
if l0ChannelExcludes.Contain(t.GetTaskProto().GetChannel()) ||
mixLabelExcludes.Contain(t.GetLabel()) ||
clusterLabelExcludes.Contain(t.GetLabel()) {
excluded = append(excluded, t)
continue
}
clusterChannelExcludes.Insert(t.GetChannel())
clusterChannelExcludes.Insert(t.GetTaskProto().GetChannel())
clusterLabelExcludes.Insert(t.GetLabel())
selected = append(selected, t)
}
c.executingGuard.Lock()
c.executingTasks[t.GetPlanID()] = t
c.executingTasks[t.GetTaskProto().GetPlanID()] = t
c.executingGuard.Unlock()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Pending).Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Executing).Inc()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Pending).Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Executing).Inc()
}
return selected
}
@ -314,19 +314,19 @@ func (c *compactionPlanHandler) loadMeta() {
if t.NeedReAssignNodeID() {
c.submitTask(t)
log.Info("compactionPlanHandler loadMeta submitTask",
zap.Int64("planID", t.GetPlanID()),
zap.Int64("triggerID", t.GetTriggerID()),
zap.Int64("collectionID", t.GetCollectionID()),
zap.Int64("planID", t.GetTaskProto().GetPlanID()),
zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()),
zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()),
zap.String("type", task.GetType().String()),
zap.String("state", t.GetState().String()))
zap.String("state", t.GetTaskProto().GetState().String()))
} else {
c.restoreTask(t)
log.Info("compactionPlanHandler loadMeta restoreTask",
zap.Int64("planID", t.GetPlanID()),
zap.Int64("triggerID", t.GetTriggerID()),
zap.Int64("collectionID", t.GetCollectionID()),
zap.Int64("planID", t.GetTaskProto().GetPlanID()),
zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()),
zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()),
zap.String("type", task.GetType().String()),
zap.String("state", t.GetState().String()))
zap.String("state", t.GetTaskProto().GetState().String()))
}
}
}
@ -474,13 +474,13 @@ func (c *compactionPlanHandler) stop() {
func (c *compactionPlanHandler) removeTasksByChannel(channel string) {
log.Info("removing tasks by channel", zap.String("channel", channel))
c.queueTasks.RemoveAll(func(task CompactionTask) bool {
if task.GetChannel() == channel {
if task.GetTaskProto().GetChannel() == channel {
log.Info("Compaction handler removing tasks by channel",
zap.String("channel", channel),
zap.Int64("planID", task.GetPlanID()),
zap.Int64("node", task.GetNodeID()),
zap.Int64("planID", task.GetTaskProto().GetPlanID()),
zap.Int64("node", task.GetTaskProto().GetNodeID()),
)
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", task.GetNodeID()), task.GetType().String(), metrics.Pending).Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", task.GetTaskProto().GetNodeID()), task.GetTaskProto().GetType().String(), metrics.Pending).Dec()
return true
}
return false
@ -489,42 +489,42 @@ func (c *compactionPlanHandler) removeTasksByChannel(channel string) {
c.executingGuard.Lock()
for id, task := range c.executingTasks {
log.Info("Compaction handler removing tasks by channel",
zap.String("channel", channel), zap.Int64("planID", id), zap.Any("task_channel", task.GetChannel()))
if task.GetChannel() == channel {
zap.String("channel", channel), zap.Int64("planID", id), zap.Any("task_channel", task.GetTaskProto().GetChannel()))
if task.GetTaskProto().GetChannel() == channel {
log.Info("Compaction handler removing tasks by channel",
zap.String("channel", channel),
zap.Int64("planID", task.GetPlanID()),
zap.Int64("node", task.GetNodeID()),
zap.Int64("planID", task.GetTaskProto().GetPlanID()),
zap.Int64("node", task.GetTaskProto().GetNodeID()),
)
delete(c.executingTasks, id)
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", task.GetNodeID()), task.GetType().String(), metrics.Executing).Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", task.GetTaskProto().GetNodeID()), task.GetTaskProto().GetType().String(), metrics.Executing).Dec()
}
}
c.executingGuard.Unlock()
}
func (c *compactionPlanHandler) submitTask(t CompactionTask) {
_, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", t.GetType()))
_, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", t.GetTaskProto().GetType()))
t.SetSpan(span)
c.queueTasks.Enqueue(t)
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Pending).Inc()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Pending).Inc()
}
// restoreTask used to restore Task from etcd
func (c *compactionPlanHandler) restoreTask(t CompactionTask) {
_, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", t.GetType()))
_, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", t.GetTaskProto().GetType()))
t.SetSpan(span)
c.executingGuard.Lock()
c.executingTasks[t.GetPlanID()] = t
c.executingTasks[t.GetTaskProto().GetPlanID()] = t
c.executingGuard.Unlock()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetNodeID()), t.GetType().String(), metrics.Executing).Inc()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetTaskProto().GetNodeID()), t.GetTaskProto().GetType().String(), metrics.Executing).Inc()
}
// getCompactionTask return compaction
func (c *compactionPlanHandler) getCompactionTask(planID int64) CompactionTask {
var t CompactionTask = nil
c.queueTasks.ForEach(func(task CompactionTask) {
if task.GetPlanID() == planID {
if task.GetTaskProto().GetPlanID() == planID {
t = task
}
})
@ -554,7 +554,7 @@ func (c *compactionPlanHandler) enqueueCompaction(task *datapb.CompactionTask) e
t.SetTask(t.ShadowClone(setStartTime(time.Now().Unix())))
err = t.SaveTaskMeta()
if err != nil {
c.meta.SetSegmentsCompacting(t.GetInputSegments(), false)
c.meta.SetSegmentsCompacting(t.GetTaskProto().GetInputSegments(), false)
log.Warn("Failed to enqueue compaction task, unable to save task meta", zap.Error(err))
return err
}
@ -596,20 +596,20 @@ func (c *compactionPlanHandler) assignNodeIDs(tasks []CompactionTask) {
nodeID, useSlot := c.pickAnyNode(slots, t)
if nodeID == NullNodeID {
log.Info("compactionHandler cannot find datanode for compaction task",
zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()), zap.String("vchannel", t.GetChannel()))
zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("type", t.GetTaskProto().GetType().String()), zap.String("vchannel", t.GetTaskProto().GetChannel()))
continue
}
err := t.SetNodeID(nodeID)
if err != nil {
log.Info("compactionHandler assignNodeID failed",
zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel()), zap.Error(err))
zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("vchannel", t.GetTaskProto().GetChannel()), zap.Error(err))
} else {
// update the input nodeSlots
slots[nodeID] = slots[nodeID] - useSlot
log.Info("compactionHandler assignNodeID success",
zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel()), zap.Any("nodeID", nodeID))
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Executing).Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetNodeID()), t.GetType().String(), metrics.Executing).Inc()
zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("vchannel", t.GetTaskProto().GetChannel()), zap.Any("nodeID", nodeID))
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Executing).Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetTaskProto().GetNodeID()), t.GetTaskProto().GetType().String(), metrics.Executing).Inc()
}
}
}
@ -644,9 +644,9 @@ func (c *compactionPlanHandler) checkCompaction() error {
// delete all finished
c.executingGuard.Lock()
for _, t := range finishedTasks {
delete(c.executingTasks, t.GetPlanID())
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetNodeID()), t.GetType().String(), metrics.Executing).Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetNodeID()), t.GetType().String(), metrics.Done).Inc()
delete(c.executingTasks, t.GetTaskProto().GetPlanID())
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetTaskProto().GetNodeID()), t.GetTaskProto().GetType().String(), metrics.Executing).Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetTaskProto().GetNodeID()), t.GetTaskProto().GetType().String(), metrics.Done).Inc()
}
c.executingGuard.Unlock()
return nil
@ -658,7 +658,7 @@ func (c *compactionPlanHandler) pickAnyNode(nodeSlots map[int64]int64, task Comp
useSlot = task.GetSlotUsage()
if useSlot <= 0 {
log.Warn("task slot should not be 0", zap.Int64("planID", task.GetPlanID()), zap.String("type", task.GetType().String()))
log.Warn("task slot should not be 0", zap.Int64("planID", task.GetTaskProto().GetPlanID()), zap.String("type", task.GetTaskProto().GetType().String()))
return NullNodeID, useSlot
}
@ -673,9 +673,9 @@ func (c *compactionPlanHandler) pickAnyNode(nodeSlots map[int64]int64, task Comp
}
func (c *compactionPlanHandler) pickShardNode(nodeSlots map[int64]int64, t CompactionTask) int64 {
nodeID, err := c.chManager.FindWatcher(t.GetChannel())
nodeID, err := c.chManager.FindWatcher(t.GetTaskProto().GetChannel())
if err != nil {
log.Info("failed to find watcher", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
log.Info("failed to find watcher", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
return NullNodeID
}
@ -692,16 +692,16 @@ func (c *compactionPlanHandler) isFull() bool {
func (c *compactionPlanHandler) checkDelay(t CompactionTask) {
log := log.Ctx(context.TODO()).WithRateGroup("compactionPlanHandler.checkDelay", 1.0, 60.0)
maxExecDuration := maxCompactionTaskExecutionDuration[t.GetType()]
startTime := time.Unix(t.GetStartTime(), 0)
maxExecDuration := maxCompactionTaskExecutionDuration[t.GetTaskProto().GetType()]
startTime := time.Unix(t.GetTaskProto().GetStartTime(), 0)
execDuration := time.Since(startTime)
if execDuration >= maxExecDuration {
log.RatedWarn(60, "compaction task is delay",
zap.Int64("planID", t.GetPlanID()),
zap.String("type", t.GetType().String()),
zap.String("state", t.GetState().String()),
zap.String("vchannel", t.GetChannel()),
zap.Int64("nodeID", t.GetNodeID()),
zap.Int64("planID", t.GetTaskProto().GetPlanID()),
zap.String("type", t.GetTaskProto().GetType().String()),
zap.String("state", t.GetTaskProto().GetState().String()),
zap.String("vchannel", t.GetTaskProto().GetChannel()),
zap.Int64("nodeID", t.GetTaskProto().GetNodeID()),
zap.Time("startTime", startTime),
zap.Duration("execDuration", execDuration))
}

View File

@ -157,11 +157,11 @@ func (q *CompactionQueue) Len() int {
var (
DefaultPrioritizer Prioritizer = func(task CompactionTask) int {
return int(task.GetPlanID())
return int(task.GetTaskProto().GetPlanID())
}
LevelPrioritizer Prioritizer = func(task CompactionTask) int {
switch task.GetType() {
switch task.GetTaskProto().GetType() {
case datapb.CompactionType_Level0DeleteCompaction:
return 1
case datapb.CompactionType_MixCompaction:
@ -174,7 +174,7 @@ var (
}
MixFirstPrioritizer Prioritizer = func(task CompactionTask) int {
switch task.GetType() {
switch task.GetTaskProto().GetType() {
case datapb.CompactionType_Level0DeleteCompaction:
return 10
case datapb.CompactionType_MixCompaction:

View File

@ -26,26 +26,23 @@ import (
)
func TestCompactionQueue(t *testing.T) {
t1 := &mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
PlanID: 3,
Type: datapb.CompactionType_MixCompaction,
},
}
t1 := &mixCompactionTask{}
t1.SetTask(&datapb.CompactionTask{
PlanID: 3,
Type: datapb.CompactionType_MixCompaction,
})
t2 := &l0CompactionTask{
CompactionTask: &datapb.CompactionTask{
PlanID: 1,
Type: datapb.CompactionType_Level0DeleteCompaction,
},
}
t2 := &l0CompactionTask{}
t2.SetTask(&datapb.CompactionTask{
PlanID: 1,
Type: datapb.CompactionType_Level0DeleteCompaction,
})
t3 := &clusteringCompactionTask{
CompactionTask: &datapb.CompactionTask{
PlanID: 2,
Type: datapb.CompactionType_ClusteringCompaction,
},
}
t3 := &clusteringCompactionTask{}
t3.SetTask(&datapb.CompactionTask{
PlanID: 2,
Type: datapb.CompactionType_ClusteringCompaction,
})
t.Run("default prioritizer", func(t *testing.T) {
cq := NewCompactionQueue(3, DefaultPrioritizer)
@ -60,13 +57,13 @@ func TestCompactionQueue(t *testing.T) {
task, err := cq.Dequeue()
assert.NoError(t, err)
assert.Equal(t, int64(1), task.GetPlanID())
assert.Equal(t, int64(1), task.GetTaskProto().GetPlanID())
task, err = cq.Dequeue()
assert.NoError(t, err)
assert.Equal(t, int64(2), task.GetPlanID())
assert.Equal(t, int64(2), task.GetTaskProto().GetPlanID())
task, err = cq.Dequeue()
assert.NoError(t, err)
assert.Equal(t, int64(3), task.GetPlanID())
assert.Equal(t, int64(3), task.GetTaskProto().GetPlanID())
})
t.Run("level prioritizer", func(t *testing.T) {
@ -82,13 +79,13 @@ func TestCompactionQueue(t *testing.T) {
task, err := cq.Dequeue()
assert.NoError(t, err)
assert.Equal(t, datapb.CompactionType_Level0DeleteCompaction, task.GetType())
assert.Equal(t, datapb.CompactionType_Level0DeleteCompaction, task.GetTaskProto().GetType())
task, err = cq.Dequeue()
assert.NoError(t, err)
assert.Equal(t, datapb.CompactionType_MixCompaction, task.GetType())
assert.Equal(t, datapb.CompactionType_MixCompaction, task.GetTaskProto().GetType())
task, err = cq.Dequeue()
assert.NoError(t, err)
assert.Equal(t, datapb.CompactionType_ClusteringCompaction, task.GetType())
assert.Equal(t, datapb.CompactionType_ClusteringCompaction, task.GetTaskProto().GetType())
})
t.Run("mix first prioritizer", func(t *testing.T) {
@ -104,13 +101,13 @@ func TestCompactionQueue(t *testing.T) {
task, err := cq.Dequeue()
assert.NoError(t, err)
assert.Equal(t, datapb.CompactionType_MixCompaction, task.GetType())
assert.Equal(t, datapb.CompactionType_MixCompaction, task.GetTaskProto().GetType())
task, err = cq.Dequeue()
assert.NoError(t, err)
assert.Equal(t, datapb.CompactionType_Level0DeleteCompaction, task.GetType())
assert.Equal(t, datapb.CompactionType_Level0DeleteCompaction, task.GetTaskProto().GetType())
task, err = cq.Dequeue()
assert.NoError(t, err)
assert.Equal(t, datapb.CompactionType_ClusteringCompaction, task.GetType())
assert.Equal(t, datapb.CompactionType_ClusteringCompaction, task.GetTaskProto().GetType())
})
t.Run("update prioritizer", func(t *testing.T) {
@ -126,15 +123,15 @@ func TestCompactionQueue(t *testing.T) {
task, err := cq.Dequeue()
assert.NoError(t, err)
assert.Equal(t, datapb.CompactionType_Level0DeleteCompaction, task.GetType())
assert.Equal(t, datapb.CompactionType_Level0DeleteCompaction, task.GetTaskProto().GetType())
cq.UpdatePrioritizer(DefaultPrioritizer)
task, err = cq.Dequeue()
assert.NoError(t, err)
assert.Equal(t, int64(2), task.GetPlanID())
assert.Equal(t, int64(2), task.GetTaskProto().GetPlanID())
task, err = cq.Dequeue()
assert.NoError(t, err)
assert.Equal(t, int64(3), task.GetPlanID())
assert.Equal(t, int64(3), task.GetTaskProto().GetPlanID())
})
}
@ -146,12 +143,11 @@ func TestConcurrency(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(c)
for i := 0; i < c; i++ {
t1 := &mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
PlanID: int64(i),
Type: datapb.CompactionType_MixCompaction,
},
}
t1 := &mixCompactionTask{}
t1.SetTask(&datapb.CompactionTask{
PlanID: int64(i),
Type: datapb.CompactionType_MixCompaction,
})
go func() {
err := cq.Enqueue(t1)
assert.NoError(t, err)

View File

@ -19,7 +19,6 @@ package datacoord
import (
"go.opentelemetry.io/otel/trace"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
)
@ -27,34 +26,17 @@ type CompactionTask interface {
Process() bool
BuildCompactionRequest() (*datapb.CompactionPlan, error)
GetSlotUsage() int64
GetTriggerID() UniqueID
GetPlanID() UniqueID
GetState() datapb.CompactionTaskState
GetChannel() string
GetLabel() string
GetType() datapb.CompactionType
GetCollectionID() int64
GetPartitionID() int64
GetInputSegments() []int64
GetStartTime() int64
GetTimeoutInSeconds() int32
GetPos() *msgpb.MsgPosition
GetPlan() *datapb.CompactionPlan
GetResult() *datapb.CompactionPlanResult
GetNodeID() UniqueID
GetSpan() trace.Span
ShadowClone(opts ...compactionTaskOpt) *datapb.CompactionTask
SetNodeID(UniqueID) error
SetTask(*datapb.CompactionTask)
SetSpan(trace.Span)
SetResult(*datapb.CompactionPlanResult)
EndSpan()
CleanLogPath()
GetTaskProto() *datapb.CompactionTask
SetPlan(plan *datapb.CompactionPlan)
ShadowClone(opts ...compactionTaskOpt) *datapb.CompactionTask
SetNodeID(UniqueID) error
NeedReAssignNodeID() bool
GetSpan() trace.Span
SetSpan(trace.Span)
SaveTaskMeta() error
}
@ -119,3 +101,9 @@ func setLastStateStartTime(lastStateStartTime int64) compactionTaskOpt {
task.LastStateStartTime = lastStateStartTime
}
}
func setAnalyzeTaskID(id int64) compactionTaskOpt {
return func(task *datapb.CompactionTask) {
task.AnalyzeTaskID = id
}
}

View File

@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
@ -47,9 +48,9 @@ import (
var _ CompactionTask = (*clusteringCompactionTask)(nil)
type clusteringCompactionTask struct {
*datapb.CompactionTask
plan *datapb.CompactionPlan
result *datapb.CompactionPlanResult
taskProto atomic.Value // *datapb.CompactionTask
plan *datapb.CompactionPlan
result *datapb.CompactionPlanResult
span trace.Span
allocator allocator.Allocator
@ -62,9 +63,16 @@ type clusteringCompactionTask struct {
slotUsage int64
}
func (t *clusteringCompactionTask) GetTaskProto() *datapb.CompactionTask {
task := t.taskProto.Load()
if task == nil {
return nil
}
return task.(*datapb.CompactionTask)
}
func newClusteringCompactionTask(t *datapb.CompactionTask, allocator allocator.Allocator, meta CompactionMeta, session session.DataNodeManager, handler Handler, analyzeScheduler *taskScheduler) *clusteringCompactionTask {
return &clusteringCompactionTask{
CompactionTask: t,
task := &clusteringCompactionTask{
allocator: allocator,
meta: meta,
sessions: session,
@ -73,17 +81,19 @@ func newClusteringCompactionTask(t *datapb.CompactionTask, allocator allocator.A
maxRetryTimes: 3,
slotUsage: paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64(),
}
task.taskProto.Store(t)
return task
}
func (t *clusteringCompactionTask) Process() bool {
log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID()))
lastState := t.GetState().String()
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
lastState := t.GetTaskProto().GetState().String()
err := t.retryableProcess()
if err != nil {
log.Warn("fail in process task", zap.Error(err))
if merr.IsRetryableErr(err) && t.RetryTimes < t.maxRetryTimes {
if merr.IsRetryableErr(err) && t.GetTaskProto().RetryTimes < t.maxRetryTimes {
// retry in next Process
err = t.updateAndSaveTaskMeta(setRetryTimes(t.RetryTimes + 1))
err = t.updateAndSaveTaskMeta(setRetryTimes(t.GetTaskProto().RetryTimes + 1))
} else {
log.Error("task fail with unretryable reason or meet max retry times", zap.Error(err))
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed), setFailReason(err.Error()))
@ -93,22 +103,22 @@ func (t *clusteringCompactionTask) Process() bool {
}
}
// task state update, refresh retry times count
currentState := t.State.String()
currentState := t.GetTaskProto().State.String()
if currentState != lastState {
ts := time.Now().Unix()
lastStateDuration := ts - t.GetLastStateStartTime()
lastStateDuration := ts - t.GetTaskProto().GetLastStateStartTime()
log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse seconds", lastStateDuration))
metrics.DataCoordCompactionLatency.
WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetClusteringKeyField().DataType)), fmt.Sprint(t.CollectionID), t.Channel, datapb.CompactionType_ClusteringCompaction.String(), lastState).
WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetTaskProto().GetClusteringKeyField().DataType)), fmt.Sprint(t.GetTaskProto().CollectionID), t.GetTaskProto().Channel, datapb.CompactionType_ClusteringCompaction.String(), lastState).
Observe(float64(lastStateDuration * 1000))
updateOps := []compactionTaskOpt{setRetryTimes(0), setLastStateStartTime(ts)}
if t.State == datapb.CompactionTaskState_completed || t.State == datapb.CompactionTaskState_cleaned {
if t.GetTaskProto().State == datapb.CompactionTaskState_completed || t.GetTaskProto().State == datapb.CompactionTaskState_cleaned {
updateOps = append(updateOps, setEndTime(ts))
elapse := ts - t.StartTime
elapse := ts - t.GetTaskProto().StartTime
log.Info("clustering compaction task total elapse", zap.Int64("elapse seconds", elapse))
metrics.DataCoordCompactionLatency.
WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetClusteringKeyField().DataType)), fmt.Sprint(t.CollectionID), t.Channel, datapb.CompactionType_ClusteringCompaction.String(), "total").
WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetTaskProto().GetClusteringKeyField().DataType)), fmt.Sprint(t.GetTaskProto().CollectionID), t.GetTaskProto().Channel, datapb.CompactionType_ClusteringCompaction.String(), "total").
Observe(float64(elapse * 1000))
}
err = t.updateAndSaveTaskMeta(updateOps...)
@ -117,29 +127,29 @@ func (t *clusteringCompactionTask) Process() bool {
}
}
log.Debug("process clustering task", zap.String("lastState", lastState), zap.String("currentState", currentState))
return t.State == datapb.CompactionTaskState_completed || t.State == datapb.CompactionTaskState_cleaned
return t.GetTaskProto().State == datapb.CompactionTaskState_completed || t.GetTaskProto().State == datapb.CompactionTaskState_cleaned
}
// retryableProcess process task's state transfer, return error if not work as expected
// the outer Process will set state and retry times according to the error type(retryable or not-retryable)
func (t *clusteringCompactionTask) retryableProcess() error {
if t.State == datapb.CompactionTaskState_completed || t.State == datapb.CompactionTaskState_cleaned {
if t.GetTaskProto().State == datapb.CompactionTaskState_completed || t.GetTaskProto().State == datapb.CompactionTaskState_cleaned {
return nil
}
coll, err := t.handler.GetCollection(context.Background(), t.GetCollectionID())
coll, err := t.handler.GetCollection(context.Background(), t.GetTaskProto().GetCollectionID())
if err != nil {
// retryable
log.Warn("fail to get collection", zap.Int64("collectionID", t.GetCollectionID()), zap.Error(err))
return merr.WrapErrClusteringCompactionGetCollectionFail(t.GetCollectionID(), err)
log.Warn("fail to get collection", zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()), zap.Error(err))
return merr.WrapErrClusteringCompactionGetCollectionFail(t.GetTaskProto().GetCollectionID(), err)
}
if coll == nil {
// not-retryable fail fast if collection is dropped
log.Warn("collection not found, it may be dropped, stop clustering compaction task", zap.Int64("collectionID", t.GetCollectionID()))
return merr.WrapErrCollectionNotFound(t.GetCollectionID())
log.Warn("collection not found, it may be dropped, stop clustering compaction task", zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
return merr.WrapErrCollectionNotFound(t.GetTaskProto().GetCollectionID())
}
switch t.State {
switch t.GetTaskProto().State {
case datapb.CompactionTaskState_pipelining:
return t.processPipelining()
case datapb.CompactionTaskState_executing:
@ -166,27 +176,28 @@ func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionP
if err != nil {
return nil, err
}
taskProto := t.taskProto.Load().(*datapb.CompactionTask)
plan := &datapb.CompactionPlan{
PlanID: t.GetPlanID(),
StartTime: t.GetStartTime(),
TimeoutInSeconds: t.GetTimeoutInSeconds(),
Type: t.GetType(),
Channel: t.GetChannel(),
CollectionTtl: t.GetCollectionTtl(),
TotalRows: t.GetTotalRows(),
Schema: t.GetSchema(),
ClusteringKeyField: t.GetClusteringKeyField().GetFieldID(),
MaxSegmentRows: t.GetMaxSegmentRows(),
PreferSegmentRows: t.GetPreferSegmentRows(),
AnalyzeResultPath: path.Join(t.meta.(*meta).chunkManager.RootPath(), common.AnalyzeStatsPath, metautil.JoinIDPath(t.AnalyzeTaskID, t.AnalyzeVersion)),
AnalyzeSegmentIds: t.GetInputSegments(),
PlanID: taskProto.GetPlanID(),
StartTime: taskProto.GetStartTime(),
TimeoutInSeconds: taskProto.GetTimeoutInSeconds(),
Type: taskProto.GetType(),
Channel: taskProto.GetChannel(),
CollectionTtl: taskProto.GetCollectionTtl(),
TotalRows: taskProto.GetTotalRows(),
Schema: taskProto.GetSchema(),
ClusteringKeyField: taskProto.GetClusteringKeyField().GetFieldID(),
MaxSegmentRows: taskProto.GetMaxSegmentRows(),
PreferSegmentRows: taskProto.GetPreferSegmentRows(),
AnalyzeResultPath: path.Join(t.meta.(*meta).chunkManager.RootPath(), common.AnalyzeStatsPath, metautil.JoinIDPath(taskProto.AnalyzeTaskID, taskProto.AnalyzeVersion)),
AnalyzeSegmentIds: taskProto.GetInputSegments(),
BeginLogID: beginLogID,
PreAllocatedSegmentIDs: t.GetPreAllocatedSegmentIDs(),
PreAllocatedSegmentIDs: taskProto.GetPreAllocatedSegmentIDs(),
SlotUsage: t.GetSlotUsage(),
}
log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))
log := log.With(zap.Int64("taskID", taskProto.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))
for _, segID := range t.GetInputSegments() {
for _, segID := range taskProto.GetInputSegments() {
segInfo := t.meta.GetHealthySegment(segID)
if segInfo == nil {
return nil, merr.WrapErrSegmentNotFound(segID)
@ -208,14 +219,15 @@ func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionP
}
func (t *clusteringCompactionTask) processPipelining() error {
log := log.With(zap.Int64("triggerID", t.TriggerID), zap.Int64("collectionID", t.GetCollectionID()), zap.Int64("planID", t.GetPlanID()))
log := log.With(zap.Int64("triggerID", t.GetTaskProto().TriggerID), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()), zap.Int64("planID", t.GetTaskProto().GetPlanID()))
if t.NeedReAssignNodeID() {
log.Debug("wait for the node to be assigned before proceeding with the subsequent steps")
return nil
}
// don't mark segment level to L2 before clustering compaction after v2.5.0
if typeutil.IsVectorType(t.GetClusteringKeyField().DataType) {
if typeutil.IsVectorType(t.GetTaskProto().GetClusteringKeyField().DataType) {
err := t.doAnalyze()
if err != nil {
log.Warn("fail to submit analyze task", zap.Error(err))
@ -232,8 +244,8 @@ func (t *clusteringCompactionTask) processPipelining() error {
}
func (t *clusteringCompactionTask) processExecuting() error {
log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()))
result, err := t.sessions.GetCompactionPlanResult(t.GetNodeID(), t.GetPlanID())
log := log.With(zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("type", t.GetTaskProto().GetType().String()))
result, err := t.sessions.GetCompactionPlanResult(t.GetTaskProto().GetNodeID(), t.GetTaskProto().GetPlanID())
if err != nil || result == nil {
if errors.Is(err, merr.ErrNodeNotFound) {
log.Warn("GetCompactionPlanResult fail", zap.Error(err))
@ -256,7 +268,7 @@ func (t *clusteringCompactionTask) processExecuting() error {
return segment.GetSegmentID()
})
_, metricMutation, err := t.meta.CompleteCompactionMutation(t.CompactionTask, t.result)
_, metricMutation, err := t.meta.CompleteCompactionMutation(t.GetTaskProto(), t.result)
if err != nil {
return err
}
@ -285,15 +297,15 @@ func (t *clusteringCompactionTask) processExecuting() error {
}
func (t *clusteringCompactionTask) processMetaSaved() error {
if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetPlanID(),
if err := t.sessions.DropCompactionPlan(t.GetTaskProto().GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetTaskProto().GetPlanID(),
}); err != nil {
log.Warn("clusteringCompactionTask processFailedOrTimeout unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
log.Warn("clusteringCompactionTask processFailedOrTimeout unable to drop compaction plan", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
}
// to ensure compatibility, if a task upgraded from version 2.4 has a status of MetaSave,
// its TmpSegments will be empty, so skip the stats task, to build index.
if len(t.GetTmpSegments()) == 0 {
log.Info("tmp segments is nil, skip stats task", zap.Int64("planID", t.GetPlanID()))
if len(t.GetTaskProto().GetTmpSegments()) == 0 {
log.Info("tmp segments is nil, skip stats task", zap.Int64("planID", t.GetTaskProto().GetPlanID()))
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_indexing))
}
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_statistic))
@ -301,11 +313,11 @@ func (t *clusteringCompactionTask) processMetaSaved() error {
func (t *clusteringCompactionTask) processStats() error {
// just the memory step, if it crashes at this step, the state after recovery is CompactionTaskState_statistic.
resultSegments := make([]int64, 0, len(t.GetTmpSegments()))
resultSegments := make([]int64, 0, len(t.GetTaskProto().GetTmpSegments()))
if Params.DataCoordCfg.EnableStatsTask.GetAsBool() {
existNonStats := false
tmpToResultSegments := make(map[int64][]int64, len(t.GetTmpSegments()))
for _, segmentID := range t.GetTmpSegments() {
tmpToResultSegments := make(map[int64][]int64, len(t.GetTaskProto().GetTmpSegments()))
for _, segmentID := range t.GetTaskProto().GetTmpSegments() {
to, ok := t.meta.(*meta).GetCompactionTo(segmentID)
if !ok || to == nil {
select {
@ -324,16 +336,16 @@ func (t *clusteringCompactionTask) processStats() error {
}
if err := t.regeneratePartitionStats(tmpToResultSegments); err != nil {
log.Warn("regenerate partition stats failed, wait for retry", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
log.Warn("regenerate partition stats failed, wait for retry", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("regeneratePartitionStats", err)
}
} else {
log.Info("stats task is not enable, set tmp segments to result segments", zap.Int64("planID", t.GetPlanID()))
resultSegments = t.GetTmpSegments()
log.Info("stats task is not enable, set tmp segments to result segments", zap.Int64("planID", t.GetTaskProto().GetPlanID()))
resultSegments = t.GetTaskProto().GetTmpSegments()
}
log.Info("clustering compaction stats task finished", zap.Int64("planID", t.GetPlanID()),
zap.Int64s("tmp segments", t.GetTmpSegments()),
log.Info("clustering compaction stats task finished", zap.Int64("planID", t.GetTaskProto().GetPlanID()),
zap.Int64s("tmp segments", t.GetTaskProto().GetTmpSegments()),
zap.Int64s("result segments", resultSegments))
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_indexing), setResultSegments(resultSegments))
@ -351,18 +363,18 @@ func (t *clusteringCompactionTask) regeneratePartitionStats(tmpToResultSegments
return err
}
partitionStatsFile := path.Join(cli.RootPath(), common.PartitionStatsPath,
metautil.JoinIDPath(t.GetCollectionID(), t.GetPartitionID()), t.plan.GetChannel(),
strconv.FormatInt(t.GetPlanID(), 10))
metautil.JoinIDPath(t.GetTaskProto().GetCollectionID(), t.GetTaskProto().GetPartitionID()), t.plan.GetChannel(),
strconv.FormatInt(t.GetTaskProto().GetPlanID(), 10))
value, err := cli.Read(ctx, partitionStatsFile)
if err != nil {
log.Warn("read partition stats file failed", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
log.Warn("read partition stats file failed", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
return err
}
partitionStats, err := storage.DeserializePartitionsStatsSnapshot(value)
if err != nil {
log.Warn("deserialize partition stats failed", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
log.Warn("deserialize partition stats failed", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
return err
}
@ -377,13 +389,13 @@ func (t *clusteringCompactionTask) regeneratePartitionStats(tmpToResultSegments
partitionStatsBytes, err := storage.SerializePartitionStatsSnapshot(partitionStats)
if err != nil {
log.Warn("serialize partition stats failed", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
log.Warn("serialize partition stats failed", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
return err
}
err = cli.Write(ctx, partitionStatsFile, partitionStatsBytes)
if err != nil {
log.Warn("save partition stats file failed", zap.Int64("planID", t.GetPlanID()),
log.Warn("save partition stats file failed", zap.Int64("planID", t.GetTaskProto().GetPlanID()),
zap.String("path", partitionStatsFile), zap.Error(err))
return err
}
@ -392,15 +404,15 @@ func (t *clusteringCompactionTask) regeneratePartitionStats(tmpToResultSegments
func (t *clusteringCompactionTask) processIndexing() error {
// wait for segment indexed
collectionIndexes := t.meta.GetIndexMeta().GetIndexesForCollection(t.GetCollectionID(), "")
collectionIndexes := t.meta.GetIndexMeta().GetIndexesForCollection(t.GetTaskProto().GetCollectionID(), "")
if len(collectionIndexes) == 0 {
log.Debug("the collection has no index, no need to do indexing")
return t.completeTask()
}
indexed := func() bool {
for _, collectionIndex := range collectionIndexes {
for _, segmentID := range t.GetResultSegments() {
segmentIndexState := t.meta.GetIndexMeta().GetSegmentIndexState(t.GetCollectionID(), segmentID, collectionIndex.IndexID)
for _, segmentID := range t.GetTaskProto().GetResultSegments() {
segmentIndexState := t.meta.GetIndexMeta().GetSegmentIndexState(t.GetTaskProto().GetCollectionID(), segmentID, collectionIndex.IndexID)
log.Debug("segment index state", zap.String("segment", segmentIndexState.String()))
if segmentIndexState.GetState() != commonpb.IndexState_Finished {
return false
@ -409,7 +421,7 @@ func (t *clusteringCompactionTask) processIndexing() error {
}
return true
}()
log.Debug("check compaction result segments index states", zap.Bool("indexed", indexed), zap.Int64("planID", t.GetPlanID()), zap.Int64s("segments", t.ResultSegments))
log.Debug("check compaction result segments index states", zap.Bool("indexed", indexed), zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Int64s("segments", t.GetTaskProto().ResultSegments))
if indexed {
return t.completeTask()
}
@ -418,9 +430,9 @@ func (t *clusteringCompactionTask) processIndexing() error {
func (t *clusteringCompactionTask) markResultSegmentsVisible() error {
var operators []UpdateOperator
for _, segID := range t.GetResultSegments() {
for _, segID := range t.GetTaskProto().GetResultSegments() {
operators = append(operators, UpdateSegmentVisible(segID))
operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, t.GetPlanID()))
operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, t.GetTaskProto().GetPlanID()))
}
err := t.meta.UpdateSegmentsInfo(operators...)
@ -434,7 +446,7 @@ func (t *clusteringCompactionTask) markResultSegmentsVisible() error {
func (t *clusteringCompactionTask) markInputSegmentsDropped() error {
var operators []UpdateOperator
// mark
for _, segID := range t.GetInputSegments() {
for _, segID := range t.GetTaskProto().GetInputSegments() {
operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped))
}
err := t.meta.UpdateSegmentsInfo(operators...)
@ -457,11 +469,11 @@ func (t *clusteringCompactionTask) completeTask() error {
// update current partition stats version
// at this point, the segment view includes both the input segments and the result segments.
if err = t.meta.GetPartitionStatsMeta().SavePartitionStatsInfo(&datapb.PartitionStatsInfo{
CollectionID: t.GetCollectionID(),
PartitionID: t.GetPartitionID(),
VChannel: t.GetChannel(),
Version: t.GetPlanID(),
SegmentIDs: t.GetResultSegments(),
CollectionID: t.GetTaskProto().GetCollectionID(),
PartitionID: t.GetTaskProto().GetPartitionID(),
VChannel: t.GetTaskProto().GetChannel(),
Version: t.GetTaskProto().GetPlanID(),
SegmentIDs: t.GetTaskProto().GetResultSegments(),
CommitTime: time.Now().Unix(),
}); err != nil {
return merr.WrapErrClusteringCompactionMetaError("SavePartitionStatsInfo", err)
@ -477,23 +489,23 @@ func (t *clusteringCompactionTask) completeTask() error {
}
func (t *clusteringCompactionTask) processAnalyzing() error {
analyzeTask := t.meta.GetAnalyzeMeta().GetTask(t.GetAnalyzeTaskID())
analyzeTask := t.meta.GetAnalyzeMeta().GetTask(t.GetTaskProto().GetAnalyzeTaskID())
if analyzeTask == nil {
log.Warn("analyzeTask not found", zap.Int64("id", t.GetAnalyzeTaskID()))
return merr.WrapErrAnalyzeTaskNotFound(t.GetAnalyzeTaskID()) // retryable
log.Warn("analyzeTask not found", zap.Int64("id", t.GetTaskProto().GetAnalyzeTaskID()))
return merr.WrapErrAnalyzeTaskNotFound(t.GetTaskProto().GetAnalyzeTaskID()) // retryable
}
log.Info("check analyze task state", zap.Int64("id", t.GetAnalyzeTaskID()), zap.Int64("version", analyzeTask.GetVersion()), zap.String("state", analyzeTask.State.String()))
log.Info("check analyze task state", zap.Int64("id", t.GetTaskProto().GetAnalyzeTaskID()), zap.Int64("version", analyzeTask.GetVersion()), zap.String("state", analyzeTask.State.String()))
switch analyzeTask.State {
case indexpb.JobState_JobStateFinished:
if analyzeTask.GetCentroidsFile() == "" {
// not retryable, fake finished vector clustering is not supported in opensource
return merr.WrapErrClusteringCompactionNotSupportVector()
} else {
t.AnalyzeVersion = analyzeTask.GetVersion()
t.GetTaskProto().AnalyzeVersion = analyzeTask.GetVersion()
return t.doCompact()
}
case indexpb.JobState_JobStateFailed:
log.Warn("analyze task fail", zap.Int64("analyzeID", t.GetAnalyzeTaskID()))
log.Warn("analyze task fail", zap.Int64("analyzeID", t.GetTaskProto().GetAnalyzeTaskID()))
return errors.New(analyzeTask.FailReason)
default:
}
@ -501,18 +513,19 @@ func (t *clusteringCompactionTask) processAnalyzing() error {
}
func (t *clusteringCompactionTask) resetSegmentCompacting() {
t.meta.SetSegmentsCompacting(t.GetInputSegments(), false)
t.meta.SetSegmentsCompacting(t.GetTaskProto().GetInputSegments(), false)
}
func (t *clusteringCompactionTask) processFailedOrTimeout() error {
log.Info("clean task", zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID()), zap.String("state", t.GetState().String()))
if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetPlanID(),
log.Info("clean task", zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("state", t.GetTaskProto().GetState().String()))
if err := t.sessions.DropCompactionPlan(t.GetTaskProto().GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetTaskProto().GetPlanID(),
}); err != nil {
log.Warn("clusteringCompactionTask processFailedOrTimeout unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
log.Warn("clusteringCompactionTask processFailedOrTimeout unable to drop compaction plan", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
}
isInputDropped := false
for _, segID := range t.GetInputSegments() {
for _, segID := range t.GetTaskProto().GetInputSegments() {
if t.meta.GetHealthySegment(segID) == nil {
isInputDropped = true
break
@ -520,22 +533,22 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
}
if isInputDropped {
log.Info("input segments dropped, doing for compatibility",
zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID()))
zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("planID", t.GetTaskProto().GetPlanID()))
// this task must be generated by v2.4, just for compatibility
// revert segments meta
var operators []UpdateOperator
// revert level of input segments
// L1 : L1 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L1
// L2 : L2 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L2
for _, segID := range t.GetInputSegments() {
for _, segID := range t.GetTaskProto().GetInputSegments() {
operators = append(operators, RevertSegmentLevelOperator(segID))
}
// if result segments are generated but task fail in the other steps, mark them as L1 segments without partitions stats
for _, segID := range t.GetResultSegments() {
for _, segID := range t.GetTaskProto().GetResultSegments() {
operators = append(operators, UpdateSegmentLevelOperator(segID, datapb.SegmentLevel_L1))
operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, 0))
}
for _, segID := range t.GetTmpSegments() {
for _, segID := range t.GetTaskProto().GetTmpSegments() {
// maybe no necessary, there will be no `TmpSegments` that task was generated by v2.4
operators = append(operators, UpdateSegmentLevelOperator(segID, datapb.SegmentLevel_L1))
operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, 0))
@ -548,16 +561,15 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
} else {
// after v2.5.0, mark the results segment as dropped
var operators []UpdateOperator
for _, segID := range t.GetResultSegments() {
for _, segID := range t.GetTaskProto().GetResultSegments() {
// Don't worry about them being loaded; they are all invisible.
operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped))
}
for _, segID := range t.GetTmpSegments() {
for _, segID := range t.GetTaskProto().GetTmpSegments() {
// Don't worry about them being loaded; they are all invisible.
// tmpSegment is always invisible
operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped))
}
err := t.meta.UpdateSegmentsInfo(operators...)
if err != nil {
log.Warn("UpdateSegmentsInfo fail", zap.Error(err))
@ -569,11 +581,11 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
// drop partition stats if uploaded
partitionStatsInfo := &datapb.PartitionStatsInfo{
CollectionID: t.GetCollectionID(),
PartitionID: t.GetPartitionID(),
VChannel: t.GetChannel(),
Version: t.GetPlanID(),
SegmentIDs: t.GetResultSegments(),
CollectionID: t.GetTaskProto().GetCollectionID(),
PartitionID: t.GetTaskProto().GetPartitionID(),
VChannel: t.GetTaskProto().GetChannel(),
Version: t.GetTaskProto().GetPlanID(),
SegmentIDs: t.GetTaskProto().GetResultSegments(),
}
err := t.meta.CleanPartitionStatsInfo(partitionStatsInfo)
if err != nil {
@ -585,34 +597,34 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
func (t *clusteringCompactionTask) doAnalyze() error {
analyzeTask := &indexpb.AnalyzeTask{
CollectionID: t.GetCollectionID(),
PartitionID: t.GetPartitionID(),
FieldID: t.GetClusteringKeyField().FieldID,
FieldName: t.GetClusteringKeyField().Name,
FieldType: t.GetClusteringKeyField().DataType,
SegmentIDs: t.GetInputSegments(),
TaskID: t.GetAnalyzeTaskID(),
CollectionID: t.GetTaskProto().GetCollectionID(),
PartitionID: t.GetTaskProto().GetPartitionID(),
FieldID: t.GetTaskProto().GetClusteringKeyField().FieldID,
FieldName: t.GetTaskProto().GetClusteringKeyField().Name,
FieldType: t.GetTaskProto().GetClusteringKeyField().DataType,
SegmentIDs: t.GetTaskProto().GetInputSegments(),
TaskID: t.GetTaskProto().GetAnalyzeTaskID(),
State: indexpb.JobState_JobStateInit,
}
err := t.meta.GetAnalyzeMeta().AddAnalyzeTask(analyzeTask)
if err != nil {
log.Warn("failed to create analyze task", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
log.Warn("failed to create analyze task", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
return err
}
t.analyzeScheduler.enqueue(newAnalyzeTask(t.GetAnalyzeTaskID()))
t.analyzeScheduler.enqueue(newAnalyzeTask(t.GetTaskProto().GetAnalyzeTaskID()))
log.Info("submit analyze task", zap.Int64("planID", t.GetPlanID()), zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("collectionID", t.GetCollectionID()), zap.Int64("id", t.GetAnalyzeTaskID()))
log.Info("submit analyze task", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()), zap.Int64("id", t.GetTaskProto().GetAnalyzeTaskID()))
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_analyzing))
}
func (t *clusteringCompactionTask) doCompact() error {
log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()))
log := log.With(zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("type", t.GetTaskProto().GetType().String()))
if t.NeedReAssignNodeID() {
log.RatedWarn(10, "not assign nodeID")
return nil
}
log = log.With(zap.Int64("nodeID", t.GetNodeID()))
log = log.With(zap.Int64("nodeID", t.GetTaskProto().GetNodeID()))
// todo refine this logic: GetCompactionPlanResult return a fail result when this is no compaction in datanode which is weird
// check whether the compaction plan is already submitted considering
@ -639,7 +651,7 @@ func (t *clusteringCompactionTask) doCompact() error {
log.Warn("Failed to BuildCompactionRequest", zap.Error(err))
return err
}
err = t.sessions.Compaction(context.Background(), t.GetNodeID(), t.GetPlan())
err = t.sessions.Compaction(context.Background(), t.GetTaskProto().GetNodeID(), t.GetPlan())
if err != nil {
if errors.Is(err, merr.ErrDataNodeSlotExhausted) {
log.Warn("fail to notify compaction tasks to DataNode because the node slots exhausted")
@ -652,7 +664,7 @@ func (t *clusteringCompactionTask) doCompact() error {
}
func (t *clusteringCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.CompactionTask {
taskClone := proto.Clone(t).(*datapb.CompactionTask)
taskClone := proto.Clone(t.GetTaskProto()).(*datapb.CompactionTask)
for _, opt := range opts {
opt(taskClone)
}
@ -666,17 +678,17 @@ func (t *clusteringCompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskO
log.Warn("Failed to saveTaskMeta", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("updateAndSaveTaskMeta", err) // retryable
}
t.CompactionTask = task
t.SetTask(task)
return nil
}
func (t *clusteringCompactionTask) checkTimeout() bool {
if t.GetTimeoutInSeconds() > 0 {
diff := time.Since(time.Unix(t.GetStartTime(), 0)).Seconds()
if diff > float64(t.GetTimeoutInSeconds()) {
if t.GetTaskProto().GetTimeoutInSeconds() > 0 {
diff := time.Since(time.Unix(t.GetTaskProto().GetStartTime(), 0)).Seconds()
if diff > float64(t.GetTaskProto().GetTimeoutInSeconds()) {
log.Warn("compaction timeout",
zap.Int32("timeout in seconds", t.GetTimeoutInSeconds()),
zap.Int64("startTime", t.GetStartTime()),
zap.Int32("timeout in seconds", t.GetTaskProto().GetTimeoutInSeconds()),
zap.Int64("startTime", t.GetTaskProto().GetStartTime()),
)
return true
}
@ -689,7 +701,7 @@ func (t *clusteringCompactionTask) saveTaskMeta(task *datapb.CompactionTask) err
}
func (t *clusteringCompactionTask) SaveTaskMeta() error {
return t.saveTaskMeta(t.CompactionTask)
return t.saveTaskMeta(t.GetTaskProto())
}
func (t *clusteringCompactionTask) GetPlan() *datapb.CompactionPlan {
@ -722,8 +734,8 @@ func (t *clusteringCompactionTask) SetPlan(plan *datapb.CompactionPlan) {
t.plan = plan
}
func (t *clusteringCompactionTask) SetTask(ct *datapb.CompactionTask) {
t.CompactionTask = ct
func (t *clusteringCompactionTask) SetTask(task *datapb.CompactionTask) {
t.taskProto.Store(task)
}
func (t *clusteringCompactionTask) SetNodeID(id UniqueID) error {
@ -731,30 +743,13 @@ func (t *clusteringCompactionTask) SetNodeID(id UniqueID) error {
}
func (t *clusteringCompactionTask) GetLabel() string {
return fmt.Sprintf("%d-%s", t.PartitionID, t.GetChannel())
return fmt.Sprintf("%d-%s", t.GetTaskProto().PartitionID, t.GetTaskProto().GetChannel())
}
func (t *clusteringCompactionTask) NeedReAssignNodeID() bool {
return t.GetState() == datapb.CompactionTaskState_pipelining && (t.GetNodeID() == 0 || t.GetNodeID() == NullNodeID)
return t.GetTaskProto().GetState() == datapb.CompactionTaskState_pipelining && (t.GetTaskProto().GetNodeID() == 0 || t.GetTaskProto().GetNodeID() == NullNodeID)
}
func (t *clusteringCompactionTask) GetSlotUsage() int64 {
return t.slotUsage
}
func (t *clusteringCompactionTask) CleanLogPath() {
if t.plan.GetSegmentBinlogs() != nil {
for _, binlogs := range t.plan.GetSegmentBinlogs() {
binlogs.FieldBinlogs = nil
binlogs.Field2StatslogPaths = nil
binlogs.Deltalogs = nil
}
}
if t.result.GetSegments() != nil {
for _, segment := range t.result.GetSegments() {
segment.InsertLogs = nil
segment.Deltalogs = nil
segment.Field2StatslogPaths = nil
}
}
}

View File

@ -160,8 +160,8 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang
task := s.generateBasicTask(false)
task.sessions = s.mockSessionMgr
s.mockSessionMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)
task.InputSegments = []int64{101, 102}
task.ResultSegments = []int64{103, 104}
task.GetTaskProto().InputSegments = []int64{101, 102}
task.GetTaskProto().ResultSegments = []int64{103, 104}
task.processFailedOrTimeout()
@ -248,9 +248,9 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang
task := s.generateBasicTask(false)
task.sessions = s.mockSessionMgr
s.mockSessionMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)
task.InputSegments = []int64{101, 102}
task.TmpSegments = []int64{103, 104}
task.ResultSegments = []int64{105, 106}
task.GetTaskProto().InputSegments = []int64{101, 102}
task.GetTaskProto().TmpSegments = []int64{103, 104}
task.GetTaskProto().ResultSegments = []int64{105, 106}
task.processFailedOrTimeout()
@ -330,23 +330,23 @@ func (s *ClusteringCompactionTaskSuite) TestProcessRetryLogic() {
task.maxRetryTimes = 3
// process pipelining fail
s.Equal(false, task.Process())
s.Equal(int32(1), task.RetryTimes)
s.Equal(int32(1), task.GetTaskProto().RetryTimes)
s.Equal(false, task.Process())
s.Equal(int32(2), task.RetryTimes)
s.Equal(int32(2), task.GetTaskProto().RetryTimes)
s.Equal(false, task.Process())
s.Equal(int32(3), task.RetryTimes)
s.Equal(datapb.CompactionTaskState_pipelining, task.GetState())
s.Equal(int32(3), task.GetTaskProto().RetryTimes)
s.Equal(datapb.CompactionTaskState_pipelining, task.GetTaskProto().GetState())
s.Equal(false, task.Process())
s.Equal(int32(0), task.RetryTimes)
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
s.Equal(int32(0), task.GetTaskProto().RetryTimes)
s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState())
}
func (s *ClusteringCompactionTaskSuite) TestProcessPipelining() {
s.Run("process pipelining fail, segment not found", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_pipelining
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining))
s.Equal(false, task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState())
})
s.Run("pipelining fail, no datanode slot", func() {
@ -367,14 +367,14 @@ func (s *ClusteringCompactionTaskSuite) TestProcessPipelining() {
},
})
s.mockSessionMgr.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(merr.WrapErrDataNodeSlotExhausted())
task.State = datapb.CompactionTaskState_pipelining
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining))
s.False(task.Process())
s.Equal(int64(NullNodeID), task.GetNodeID())
s.Equal(int64(NullNodeID), task.GetTaskProto().GetNodeID())
})
s.Run("process succeed, scalar clustering key", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_pipelining
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining))
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 101,
@ -391,14 +391,14 @@ func (s *ClusteringCompactionTaskSuite) TestProcessPipelining() {
},
})
s.mockSessionMgr.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(nil)
task.State = datapb.CompactionTaskState_pipelining
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining))
s.Equal(false, task.Process())
s.Equal(datapb.CompactionTaskState_executing, task.GetState())
s.Equal(datapb.CompactionTaskState_executing, task.GetTaskProto().GetState())
})
s.Run("process succeed, vector clustering key", func() {
task := s.generateBasicTask(true)
task.State = datapb.CompactionTaskState_pipelining
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining))
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 101,
@ -414,16 +414,16 @@ func (s *ClusteringCompactionTaskSuite) TestProcessPipelining() {
PartitionStatsVersion: 10000,
},
})
task.State = datapb.CompactionTaskState_pipelining
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining))
s.Equal(false, task.Process())
s.Equal(datapb.CompactionTaskState_analyzing, task.GetState())
s.Equal(datapb.CompactionTaskState_analyzing, task.GetTaskProto().GetState())
})
}
func (s *ClusteringCompactionTaskSuite) TestProcessExecuting() {
s.Run("process executing, get compaction result fail", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_executing
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing))
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 101,
@ -441,12 +441,12 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecuting() {
})
s.mockSessionMgr.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(nil, merr.WrapErrNodeNotFound(1)).Once()
s.Equal(false, task.Process())
s.Equal(datapb.CompactionTaskState_pipelining, task.GetState())
s.Equal(datapb.CompactionTaskState_pipelining, task.GetTaskProto().GetState())
})
s.Run("process executing, compaction result not ready", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_executing
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing))
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 101,
@ -464,17 +464,17 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecuting() {
})
s.mockSessionMgr.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(nil, nil).Once()
s.Equal(false, task.Process())
s.Equal(datapb.CompactionTaskState_executing, task.GetState())
s.Equal(datapb.CompactionTaskState_executing, task.GetTaskProto().GetState())
s.mockSessionMgr.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{
State: datapb.CompactionTaskState_executing,
}, nil).Once()
s.Equal(false, task.Process())
s.Equal(datapb.CompactionTaskState_executing, task.GetState())
s.Equal(datapb.CompactionTaskState_executing, task.GetTaskProto().GetState())
})
s.Run("process executing, scalar clustering key, compaction result ready", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_executing
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing))
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 101,
@ -503,12 +503,12 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecuting() {
}, nil).Once()
s.mockSessionMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil).Once()
s.Equal(false, task.Process())
s.Equal(datapb.CompactionTaskState_statistic, task.GetState())
s.Equal(datapb.CompactionTaskState_statistic, task.GetTaskProto().GetState())
})
s.Run("process executing, compaction result ready", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_executing
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing))
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 101,
@ -538,14 +538,12 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecuting() {
// DropCompactionPlan fail
s.mockSessionMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(merr.WrapErrNodeNotFound(1)).Once()
s.Equal(false, task.Process())
s.Equal(datapb.CompactionTaskState_statistic, task.GetState())
s.Equal(datapb.CompactionTaskState_statistic, task.GetTaskProto().GetState())
})
s.Run("process executing, compaction result timeout", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_executing
task.StartTime = time.Now().Unix()
task.TimeoutInSeconds = 1
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing), setStartTime(time.Now().Unix()), setTimeoutInSeconds(1))
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 101,
@ -576,7 +574,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecuting() {
time.Sleep(time.Second * 1)
s.Equal(true, task.Process())
s.Equal(datapb.CompactionTaskState_cleaned, task.GetState())
s.Equal(datapb.CompactionTaskState_cleaned, task.GetTaskProto().GetState())
})
}
@ -586,25 +584,25 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecutingState() {
State: datapb.CompactionTaskState_failed,
}, nil).Once()
s.NoError(task.processExecuting())
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState())
s.mockSessionMgr.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{
State: datapb.CompactionTaskState_failed,
}, nil).Once()
s.NoError(task.processExecuting())
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState())
s.mockSessionMgr.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{
State: datapb.CompactionTaskState_pipelining,
}, nil).Once()
s.NoError(task.processExecuting())
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState())
s.mockSessionMgr.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{
State: datapb.CompactionTaskState_completed,
}, nil).Once()
s.Error(task.processExecuting())
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState())
s.mockSessionMgr.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{
State: datapb.CompactionTaskState_completed,
@ -618,36 +616,36 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecutingState() {
},
}, nil).Once()
s.Error(task.processExecuting())
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState())
}
func (s *ClusteringCompactionTaskSuite) TestProcessIndexingState() {
s.Run("collection has no index", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_indexing
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_indexing))
s.True(task.Process())
s.Equal(datapb.CompactionTaskState_completed, task.GetState())
s.Equal(datapb.CompactionTaskState_completed, task.GetTaskProto().GetState())
})
s.Run("collection has index, segment is not indexed", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_indexing
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_indexing))
index := &model.Index{
CollectionID: 1,
IndexID: 3,
}
task.ResultSegments = []int64{10, 11}
task.updateAndSaveTaskMeta(setResultSegments([]int64{10, 11}))
err := s.meta.indexMeta.CreateIndex(index)
s.NoError(err)
s.False(task.Process())
s.Equal(datapb.CompactionTaskState_indexing, task.GetState())
s.Equal(datapb.CompactionTaskState_indexing, task.GetTaskProto().GetState())
})
s.Run("collection has index, segment indexed", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_indexing
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_indexing))
index := &model.Index{
CollectionID: 1,
IndexID: 3,
@ -669,62 +667,59 @@ func (s *ClusteringCompactionTaskSuite) TestProcessIndexingState() {
})
s.True(task.Process())
s.Equal(datapb.CompactionTaskState_completed, task.GetState())
s.Equal(datapb.CompactionTaskState_completed, task.GetTaskProto().GetState())
})
}
func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() {
s.Run("analyze task not found", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_analyzing
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_analyzing))
s.False(task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState())
})
s.Run("analyze task failed", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_analyzing
task.AnalyzeTaskID = 7
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_analyzing), setAnalyzeTaskID(7))
t := &indexpb.AnalyzeTask{
CollectionID: task.CollectionID,
PartitionID: task.PartitionID,
FieldID: task.ClusteringKeyField.FieldID,
SegmentIDs: task.InputSegments,
CollectionID: task.GetTaskProto().CollectionID,
PartitionID: task.GetTaskProto().PartitionID,
FieldID: task.GetTaskProto().ClusteringKeyField.FieldID,
SegmentIDs: task.GetTaskProto().InputSegments,
TaskID: 7,
State: indexpb.JobState_JobStateFailed,
}
s.meta.analyzeMeta.AddAnalyzeTask(t)
s.False(task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState())
})
s.Run("analyze task fake finish, vector not support", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_analyzing
task.AnalyzeTaskID = 7
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_analyzing), setAnalyzeTaskID(7))
t := &indexpb.AnalyzeTask{
CollectionID: task.CollectionID,
PartitionID: task.PartitionID,
FieldID: task.ClusteringKeyField.FieldID,
SegmentIDs: task.InputSegments,
CollectionID: task.GetTaskProto().CollectionID,
PartitionID: task.GetTaskProto().PartitionID,
FieldID: task.GetTaskProto().ClusteringKeyField.FieldID,
SegmentIDs: task.GetTaskProto().InputSegments,
TaskID: 7,
State: indexpb.JobState_JobStateFinished,
CentroidsFile: "",
}
s.meta.analyzeMeta.AddAnalyzeTask(t)
s.False(task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState())
})
s.Run("analyze task finished", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_analyzing
task.AnalyzeTaskID = 7
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_analyzing), setAnalyzeTaskID(7))
t := &indexpb.AnalyzeTask{
CollectionID: task.CollectionID,
PartitionID: task.PartitionID,
FieldID: task.ClusteringKeyField.FieldID,
SegmentIDs: task.InputSegments,
CollectionID: task.GetTaskProto().CollectionID,
PartitionID: task.GetTaskProto().PartitionID,
FieldID: task.GetTaskProto().ClusteringKeyField.FieldID,
SegmentIDs: task.GetTaskProto().InputSegments,
TaskID: 7,
State: indexpb.JobState_JobStateFinished,
CentroidsFile: "somewhere",
@ -748,7 +743,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() {
s.mockSessionMgr.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(nil)
s.False(task.Process())
s.Equal(datapb.CompactionTaskState_executing, task.GetState())
s.Equal(datapb.CompactionTaskState_executing, task.GetTaskProto().GetState())
})
}
@ -756,7 +751,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() {
func (s *ClusteringCompactionTaskSuite) TestCompleteTask() {
task := s.generateBasicTask(false)
task.completeTask()
partitionStats := s.meta.GetPartitionStatsMeta().GetPartitionStats(task.GetCollectionID(), task.GetPartitionID(), task.GetChannel(), task.GetPlanID())
partitionStats := s.meta.GetPartitionStatsMeta().GetPartitionStats(task.GetTaskProto().GetCollectionID(), task.GetTaskProto().GetPartitionID(), task.GetTaskProto().GetChannel(), task.GetTaskProto().GetPlanID())
s.True(partitionStats.GetCommitTime() > time.Now().Add(-2*time.Second).Unix())
}
@ -817,20 +812,18 @@ func ConstructClusteringSchema(collection string, dim int, autoID bool, vectorCl
func (s *ClusteringCompactionTaskSuite) TestProcessStatsState() {
s.Run("compaction to not exist", func() {
task := s.generateBasicTask(false)
task.TmpSegments = task.ResultSegments
task.State = datapb.CompactionTaskState_statistic
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_statistic), setTmpSegments(task.GetTaskProto().GetResultSegments()))
s.False(task.Process())
s.Equal(datapb.CompactionTaskState_statistic, task.GetState())
s.Equal(int32(0), task.RetryTimes)
s.Equal(datapb.CompactionTaskState_statistic, task.GetTaskProto().GetState())
s.Equal(int32(0), task.GetTaskProto().RetryTimes)
})
s.Run("partition stats file not exist", func() {
task := s.generateBasicTask(false)
task.TmpSegments = task.ResultSegments
task.State = datapb.CompactionTaskState_statistic
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_statistic), setTmpSegments(task.GetTaskProto().GetResultSegments()))
task.maxRetryTimes = 3
for _, segID := range task.GetTmpSegments() {
for _, segID := range task.GetTaskProto().GetTmpSegments() {
err := s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
@ -853,17 +846,16 @@ func (s *ClusteringCompactionTaskSuite) TestProcessStatsState() {
}
s.False(task.Process())
s.Equal(datapb.CompactionTaskState_statistic, task.GetState())
s.Equal(int32(1), task.RetryTimes)
s.Equal(datapb.CompactionTaskState_statistic, task.GetTaskProto().GetState())
s.Equal(int32(1), task.GetTaskProto().RetryTimes)
})
s.Run("partition stats deserialize failed", func() {
task := s.generateBasicTask(false)
task.TmpSegments = task.ResultSegments
task.State = datapb.CompactionTaskState_statistic
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_statistic), setTmpSegments(task.GetTaskProto().GetResultSegments()))
task.maxRetryTimes = 3
for _, segID := range task.GetTmpSegments() {
for _, segID := range task.GetTaskProto().GetTmpSegments() {
err := s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
@ -886,8 +878,8 @@ func (s *ClusteringCompactionTaskSuite) TestProcessStatsState() {
}
partitionStatsFile := path.Join(Params.MinioCfg.RootPath.GetValue(), common.PartitionStatsPath,
metautil.JoinIDPath(task.GetCollectionID(), task.GetPartitionID()), task.plan.GetChannel(),
strconv.FormatInt(task.GetPlanID(), 10))
metautil.JoinIDPath(task.GetTaskProto().GetCollectionID(), task.GetTaskProto().GetPartitionID()), task.plan.GetChannel(),
strconv.FormatInt(task.GetTaskProto().GetPlanID(), 10))
chunkManagerFactory := storage.NewChunkManagerFactoryWithParam(Params)
cli, err := chunkManagerFactory.NewPersistentStorageChunkManager(context.Background())
@ -901,17 +893,16 @@ func (s *ClusteringCompactionTaskSuite) TestProcessStatsState() {
s.NoError(err)
s.False(task.Process())
s.Equal(datapb.CompactionTaskState_statistic, task.GetState())
s.Equal(int32(1), task.RetryTimes)
s.Equal(datapb.CompactionTaskState_statistic, task.GetTaskProto().GetState())
s.Equal(int32(1), task.GetTaskProto().RetryTimes)
})
s.Run("normal case", func() {
task := s.generateBasicTask(false)
task.TmpSegments = task.ResultSegments
task.State = datapb.CompactionTaskState_statistic
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_statistic), setTmpSegments(task.GetTaskProto().GetResultSegments()))
task.maxRetryTimes = 3
for _, segID := range task.GetTmpSegments() {
for _, segID := range task.GetTaskProto().GetTmpSegments() {
err := s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
@ -934,8 +925,8 @@ func (s *ClusteringCompactionTaskSuite) TestProcessStatsState() {
}
partitionStatsFile := path.Join(Params.MinioCfg.RootPath.GetValue(), common.PartitionStatsPath,
metautil.JoinIDPath(task.GetCollectionID(), task.GetPartitionID()), task.plan.GetChannel(),
strconv.FormatInt(task.GetPlanID(), 10))
metautil.JoinIDPath(task.GetTaskProto().GetCollectionID(), task.GetTaskProto().GetPartitionID()), task.plan.GetChannel(),
strconv.FormatInt(task.GetTaskProto().GetPlanID(), 10))
chunkManagerFactory := storage.NewChunkManagerFactoryWithParam(Params)
cli, err := chunkManagerFactory.NewPersistentStorageChunkManager(context.Background())
@ -947,10 +938,10 @@ func (s *ClusteringCompactionTaskSuite) TestProcessStatsState() {
partitionStats := &storage.PartitionStatsSnapshot{
SegmentStats: make(map[int64]storage.SegmentStats),
Version: task.GetPlanID(),
Version: task.GetTaskProto().GetPlanID(),
}
for _, segID := range task.GetTmpSegments() {
for _, segID := range task.GetTaskProto().GetTmpSegments() {
partitionStats.SegmentStats[segID] = storage.SegmentStats{
FieldStats: []storage.FieldStats{
{
@ -968,21 +959,19 @@ func (s *ClusteringCompactionTaskSuite) TestProcessStatsState() {
s.NoError(err)
s.False(task.Process())
s.Equal(datapb.CompactionTaskState_indexing, task.GetState())
s.Equal(int32(0), task.RetryTimes)
s.Equal(datapb.CompactionTaskState_indexing, task.GetTaskProto().GetState())
s.Equal(int32(0), task.GetTaskProto().RetryTimes)
})
s.Run("not enable stats task", func() {
Params.Save(Params.DataCoordCfg.EnableStatsTask.Key, "false")
defer Params.Reset(Params.DataCoordCfg.EnableStatsTask.Key)
task := s.generateBasicTask(false)
task.TmpSegments = task.ResultSegments
task.State = datapb.CompactionTaskState_statistic
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_statistic), setTmpSegments(task.GetTaskProto().GetResultSegments()), setResultSegments(nil))
task.maxRetryTimes = 3
task.ResultSegments = nil
s.False(task.Process())
s.Equal(datapb.CompactionTaskState_indexing, task.GetState())
s.Equal(int32(0), task.RetryTimes)
s.Equal(datapb.CompactionTaskState_indexing, task.GetTaskProto().GetState())
s.Equal(int32(0), task.GetTaskProto().RetryTimes)
})
}

View File

@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
@ -40,9 +41,9 @@ import (
var _ CompactionTask = (*l0CompactionTask)(nil)
type l0CompactionTask struct {
*datapb.CompactionTask
plan *datapb.CompactionPlan
result *datapb.CompactionPlanResult
taskProto atomic.Value // *datapb.CompactionTask
plan *datapb.CompactionPlan
result *datapb.CompactionPlanResult
span trace.Span
allocator allocator.Allocator
@ -52,20 +53,29 @@ type l0CompactionTask struct {
slotUsage int64
}
func newL0CompactionTask(t *datapb.CompactionTask, allocator allocator.Allocator, meta CompactionMeta, session session.DataNodeManager) *l0CompactionTask {
return &l0CompactionTask{
CompactionTask: t,
allocator: allocator,
meta: meta,
sessions: session,
slotUsage: paramtable.Get().DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64(),
func (t *l0CompactionTask) GetTaskProto() *datapb.CompactionTask {
task := t.taskProto.Load()
if task == nil {
return nil
}
return task.(*datapb.CompactionTask)
}
func newL0CompactionTask(t *datapb.CompactionTask, allocator allocator.Allocator, meta CompactionMeta, session session.DataNodeManager) *l0CompactionTask {
task := &l0CompactionTask{
allocator: allocator,
meta: meta,
sessions: session,
slotUsage: paramtable.Get().DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64(),
}
task.taskProto.Store(t)
return task
}
// Note: return True means exit this state machine.
// ONLY return True for processCompleted or processFailed
func (t *l0CompactionTask) Process() bool {
switch t.GetState() {
switch t.GetTaskProto().GetState() {
case datapb.CompactionTaskState_pipelining:
return t.processPipelining()
case datapb.CompactionTaskState_executing:
@ -87,7 +97,7 @@ func (t *l0CompactionTask) processPipelining() bool {
return false
}
log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("nodeID", t.GetNodeID()))
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("nodeID", t.GetTaskProto().GetNodeID()))
var err error
t.plan, err = t.BuildCompactionRequest()
if err != nil {
@ -101,9 +111,9 @@ func (t *l0CompactionTask) processPipelining() bool {
return t.processFailed()
}
err = t.sessions.Compaction(context.TODO(), t.GetNodeID(), t.GetPlan())
err = t.sessions.Compaction(context.TODO(), t.GetTaskProto().GetNodeID(), t.GetPlan())
if err != nil {
log.Warn("l0CompactionTask failed to notify compaction tasks to DataNode", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
log.Warn("l0CompactionTask failed to notify compaction tasks to DataNode", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
return false
}
@ -113,8 +123,8 @@ func (t *l0CompactionTask) processPipelining() bool {
}
func (t *l0CompactionTask) processExecuting() bool {
log := log.With(zap.Int64("planID", t.GetPlanID()), zap.Int64("nodeID", t.GetNodeID()))
result, err := t.sessions.GetCompactionPlanResult(t.GetNodeID(), t.GetPlanID())
log := log.With(zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Int64("nodeID", t.GetTaskProto().GetNodeID()))
result, err := t.sessions.GetCompactionPlanResult(t.GetTaskProto().GetNodeID(), t.GetTaskProto().GetPlanID())
if err != nil || result == nil {
if errors.Is(err, merr.ErrNodeNotFound) {
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
@ -158,7 +168,7 @@ func (t *l0CompactionTask) processExecuting() bool {
func (t *l0CompactionTask) processMetaSaved() bool {
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed))
if err != nil {
log.Warn("l0CompactionTask unable to processMetaSaved", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
log.Warn("l0CompactionTask unable to processMetaSaved", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
return false
}
return t.processCompleted()
@ -166,17 +176,17 @@ func (t *l0CompactionTask) processMetaSaved() bool {
func (t *l0CompactionTask) processCompleted() bool {
if t.hasAssignedWorker() {
err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetPlanID(),
err := t.sessions.DropCompactionPlan(t.GetTaskProto().GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetTaskProto().GetPlanID(),
})
if err != nil {
log.Warn("l0CompactionTask unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
log.Warn("l0CompactionTask unable to drop compaction plan", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
}
}
t.resetSegmentCompacting()
UpdateCompactionSegmentSizeMetrics(t.result.GetSegments())
log.Info("l0CompactionTask processCompleted done", zap.Int64("planID", t.GetPlanID()))
log.Info("l0CompactionTask processCompleted done", zap.Int64("planID", t.GetTaskProto().GetPlanID()))
return true
}
@ -192,11 +202,11 @@ func (t *l0CompactionTask) processTimeout() bool {
func (t *l0CompactionTask) processFailed() bool {
if t.hasAssignedWorker() {
err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetPlanID(),
err := t.sessions.DropCompactionPlan(t.GetTaskProto().GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetTaskProto().GetPlanID(),
})
if err != nil {
log.Warn("l0CompactionTask processFailed unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
log.Warn("l0CompactionTask processFailed unable to drop compaction plan", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
}
}
@ -207,7 +217,7 @@ func (t *l0CompactionTask) processFailed() bool {
return false
}
log.Info("l0CompactionTask processFailed done", zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID()))
log.Info("l0CompactionTask processFailed done", zap.Int64("taskID", t.GetTaskProto().GetTriggerID()), zap.Int64("planID", t.GetTaskProto().GetPlanID()))
return true
}
@ -220,7 +230,7 @@ func (t *l0CompactionTask) SetResult(result *datapb.CompactionPlanResult) {
}
func (t *l0CompactionTask) SetTask(task *datapb.CompactionTask) {
t.CompactionTask = task
t.taskProto.Store(task)
}
func (t *l0CompactionTask) GetSpan() trace.Span {
@ -245,40 +255,16 @@ func (t *l0CompactionTask) GetPlan() *datapb.CompactionPlan {
return t.plan
}
func (t *l0CompactionTask) SetStartTime(startTime int64) {
t.StartTime = startTime
}
func (t *l0CompactionTask) GetLabel() string {
return fmt.Sprintf("%d-%s", t.PartitionID, t.GetChannel())
return fmt.Sprintf("%d-%s", t.GetTaskProto().PartitionID, t.GetTaskProto().GetChannel())
}
func (t *l0CompactionTask) NeedReAssignNodeID() bool {
return t.GetState() == datapb.CompactionTaskState_pipelining && (!t.hasAssignedWorker())
}
func (t *l0CompactionTask) CleanLogPath() {
if t.plan == nil {
return
}
if t.plan.GetSegmentBinlogs() != nil {
for _, binlogs := range t.plan.GetSegmentBinlogs() {
binlogs.FieldBinlogs = nil
binlogs.Field2StatslogPaths = nil
binlogs.Deltalogs = nil
}
}
if t.result.GetSegments() != nil {
for _, segment := range t.result.GetSegments() {
segment.InsertLogs = nil
segment.Deltalogs = nil
segment.Field2StatslogPaths = nil
}
}
return t.GetTaskProto().GetState() == datapb.CompactionTaskState_pipelining && (!t.hasAssignedWorker())
}
func (t *l0CompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.CompactionTask {
taskClone := proto.Clone(t).(*datapb.CompactionTask)
taskClone := proto.Clone(t.GetTaskProto()).(*datapb.CompactionTask)
for _, opt := range opts {
opt(taskClone)
}
@ -290,21 +276,22 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err
if err != nil {
return nil, err
}
taskProto := t.taskProto.Load().(*datapb.CompactionTask)
plan := &datapb.CompactionPlan{
PlanID: t.GetPlanID(),
StartTime: t.GetStartTime(),
TimeoutInSeconds: t.GetTimeoutInSeconds(),
Type: t.GetType(),
Channel: t.GetChannel(),
CollectionTtl: t.GetCollectionTtl(),
TotalRows: t.GetTotalRows(),
Schema: t.GetSchema(),
PlanID: taskProto.GetPlanID(),
StartTime: taskProto.GetStartTime(),
TimeoutInSeconds: taskProto.GetTimeoutInSeconds(),
Type: taskProto.GetType(),
Channel: taskProto.GetChannel(),
CollectionTtl: taskProto.GetCollectionTtl(),
TotalRows: taskProto.GetTotalRows(),
Schema: taskProto.GetSchema(),
BeginLogID: beginLogID,
SlotUsage: t.GetSlotUsage(),
}
log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))
for _, segID := range t.GetInputSegments() {
log := log.With(zap.Int64("taskID", taskProto.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))
for _, segID := range taskProto.GetInputSegments() {
segInfo := t.meta.GetHealthySegment(segID)
if segInfo == nil {
return nil, merr.WrapErrSegmentNotFound(segID)
@ -322,19 +309,19 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err
// Select sealed L1 segments for LevelZero compaction that meets the condition:
// dmlPos < triggerInfo.pos
sealedSegments := t.meta.SelectSegments(WithCollection(t.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool {
return (t.GetPartitionID() == common.AllPartitionsID || info.GetPartitionID() == t.GetPartitionID()) &&
sealedSegments := t.meta.SelectSegments(WithCollection(taskProto.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool {
return (taskProto.GetPartitionID() == common.AllPartitionsID || info.GetPartitionID() == taskProto.GetPartitionID()) &&
info.GetInsertChannel() == plan.GetChannel() &&
isFlushState(info.GetState()) &&
!info.GetIsImporting() &&
info.GetLevel() != datapb.SegmentLevel_L0 &&
info.GetStartPosition().GetTimestamp() < t.GetPos().GetTimestamp()
info.GetStartPosition().GetTimestamp() < taskProto.GetPos().GetTimestamp()
}))
if len(sealedSegments) == 0 {
// TO-DO fast finish l0 segment, just drop l0 segment
log.Info("l0Compaction available non-L0 Segments is empty ")
return nil, errors.Errorf("Selected zero L1/L2 segments for the position=%v", t.GetPos())
return nil, errors.Errorf("Selected zero L1/L2 segments for the position=%v", taskProto.GetPos())
}
for _, segInfo := range sealedSegments {
@ -359,29 +346,29 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err
plan.SegmentBinlogs = append(plan.SegmentBinlogs, sealedSegBinlogs...)
log.Info("l0CompactionTask refreshed level zero compaction plan",
zap.Any("target position", t.GetPos()),
zap.Any("target position", taskProto.GetPos()),
zap.Any("target segments count", len(sealedSegBinlogs)))
return plan, nil
}
func (t *l0CompactionTask) resetSegmentCompacting() {
t.meta.SetSegmentsCompacting(t.GetInputSegments(), false)
t.meta.SetSegmentsCompacting(t.GetTaskProto().GetInputSegments(), false)
}
func (t *l0CompactionTask) hasAssignedWorker() bool {
return t.GetNodeID() != 0 && t.GetNodeID() != NullNodeID
return t.GetTaskProto().GetNodeID() != 0 && t.GetTaskProto().GetNodeID() != NullNodeID
}
func (t *l0CompactionTask) checkTimeout() bool {
if t.GetTimeoutInSeconds() > 0 {
start := time.Unix(t.GetStartTime(), 0)
if t.GetTaskProto().GetTimeoutInSeconds() > 0 {
start := time.Unix(t.GetTaskProto().GetStartTime(), 0)
diff := time.Since(start).Seconds()
if diff > float64(t.GetTimeoutInSeconds()) {
if diff > float64(t.GetTaskProto().GetTimeoutInSeconds()) {
log.Warn("compaction timeout",
zap.Int64("taskID", t.GetTriggerID()),
zap.Int64("planID", t.GetPlanID()),
zap.Int64("nodeID", t.GetNodeID()),
zap.Int32("timeout in seconds", t.GetTimeoutInSeconds()),
zap.Int64("taskID", t.GetTaskProto().GetTriggerID()),
zap.Int64("planID", t.GetTaskProto().GetPlanID()),
zap.Int64("nodeID", t.GetTaskProto().GetNodeID()),
zap.Int32("timeout in seconds", t.GetTaskProto().GetTimeoutInSeconds()),
zap.Time("startTime", start),
)
return true
@ -395,7 +382,7 @@ func (t *l0CompactionTask) SetNodeID(id UniqueID) error {
}
func (t *l0CompactionTask) SaveTaskMeta() error {
return t.saveTaskMeta(t.CompactionTask)
return t.saveTaskMeta(t.GetTaskProto())
}
func (t *l0CompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error {
@ -404,7 +391,7 @@ func (t *l0CompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) erro
if err != nil {
return err
}
t.CompactionTask = task
t.SetTask(task)
return nil
}
@ -419,12 +406,12 @@ func (t *l0CompactionTask) saveSegmentMeta() error {
operators = append(operators, AddBinlogsOperator(seg.GetSegmentID(), nil, nil, seg.GetDeltalogs(), nil))
}
for _, segID := range t.InputSegments {
for _, segID := range t.GetTaskProto().InputSegments {
operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped), UpdateCompactedOperator(segID))
}
log.Info("meta update: update segments info for level zero compaction",
zap.Int64("planID", t.GetPlanID()),
zap.Int64("planID", t.GetTaskProto().GetPlanID()),
)
return t.meta.UpdateSegmentsInfo(operators...)

View File

@ -50,6 +50,7 @@ func (s *L0CompactionTaskSuite) SetupTest() {
s.mockMeta = NewMockCompactionMeta(s.T())
s.mockSessMgr = session.NewMockDataNodeManager(s.T())
s.mockAlloc = allocator.NewMockAllocator(s.T())
//s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
}
func (s *L0CompactionTaskSuite) SetupSubTest() {
@ -89,19 +90,16 @@ func (s *L0CompactionTaskSuite) TestProcessRefreshPlan_NormalL0() {
Deltalogs: deltaLogs,
}}
}).Times(2)
task := &l0CompactionTask{
CompactionTask: &datapb.CompactionTask{
PlanID: 1,
TriggerID: 19530,
CollectionID: 1,
PartitionID: 10,
Type: datapb.CompactionType_Level0DeleteCompaction,
NodeID: 1,
State: datapb.CompactionTaskState_executing,
InputSegments: []int64{100, 101},
},
meta: s.mockMeta,
}
task := newL0CompactionTask(&datapb.CompactionTask{
PlanID: 1,
TriggerID: 19530,
CollectionID: 1,
PartitionID: 10,
Type: datapb.CompactionType_Level0DeleteCompaction,
NodeID: 1,
State: datapb.CompactionTaskState_executing,
InputSegments: []int64{100, 101},
}, nil, s.mockMeta, nil)
alloc := allocator.NewMockAllocator(s.T())
alloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil)
task.allocator = alloc
@ -121,20 +119,17 @@ func (s *L0CompactionTaskSuite) TestProcessRefreshPlan_SegmentNotFoundL0() {
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo {
return nil
}).Once()
task := &l0CompactionTask{
CompactionTask: &datapb.CompactionTask{
InputSegments: []int64{102},
PlanID: 1,
TriggerID: 19530,
CollectionID: 1,
PartitionID: 10,
Channel: channel,
Type: datapb.CompactionType_Level0DeleteCompaction,
NodeID: 1,
State: datapb.CompactionTaskState_executing,
},
meta: s.mockMeta,
}
task := newL0CompactionTask(&datapb.CompactionTask{
InputSegments: []int64{102},
PlanID: 1,
TriggerID: 19530,
CollectionID: 1,
PartitionID: 10,
Channel: channel,
Type: datapb.CompactionType_Level0DeleteCompaction,
NodeID: 1,
State: datapb.CompactionTaskState_executing,
}, nil, s.mockMeta, nil)
alloc := allocator.NewMockAllocator(s.T())
alloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil)
task.allocator = alloc
@ -158,19 +153,16 @@ func (s *L0CompactionTaskSuite) TestProcessRefreshPlan_SelectZeroSegmentsL0() {
}).Times(2)
s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything).Return(nil).Once()
task := &l0CompactionTask{
CompactionTask: &datapb.CompactionTask{
PlanID: 1,
TriggerID: 19530,
CollectionID: 1,
PartitionID: 10,
Type: datapb.CompactionType_Level0DeleteCompaction,
NodeID: 1,
State: datapb.CompactionTaskState_executing,
InputSegments: []int64{100, 101},
},
meta: s.mockMeta,
}
task := newL0CompactionTask(&datapb.CompactionTask{
PlanID: 1,
TriggerID: 19530,
CollectionID: 1,
PartitionID: 10,
Type: datapb.CompactionType_Level0DeleteCompaction,
NodeID: 1,
State: datapb.CompactionTaskState_executing,
InputSegments: []int64{100, 101},
}, nil, s.mockMeta, nil)
alloc := allocator.NewMockAllocator(s.T())
alloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil)
task.allocator = alloc
@ -206,38 +198,35 @@ func (s *L0CompactionTaskSuite) TestBuildCompactionRequestFailed_AllocFailed() {
}
func (s *L0CompactionTaskSuite) generateTestL0Task(state datapb.CompactionTaskState) *l0CompactionTask {
return &l0CompactionTask{
CompactionTask: &datapb.CompactionTask{
PlanID: 1,
TriggerID: 19530,
CollectionID: 1,
PartitionID: 10,
Type: datapb.CompactionType_Level0DeleteCompaction,
NodeID: NullNodeID,
State: state,
Channel: "ch-1",
InputSegments: []int64{100, 101},
},
meta: s.mockMeta,
sessions: s.mockSessMgr,
allocator: s.mockAlloc,
}
return newL0CompactionTask(&datapb.CompactionTask{
PlanID: 1,
TriggerID: 19530,
CollectionID: 1,
PartitionID: 10,
Type: datapb.CompactionType_Level0DeleteCompaction,
NodeID: NullNodeID,
State: state,
Channel: "ch-1",
InputSegments: []int64{100, 101},
}, s.mockAlloc, s.mockMeta, s.mockSessMgr)
}
func (s *L0CompactionTaskSuite) TestPorcessStateTrans() {
s.Run("test pipelining needReassignNodeID", func() {
t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining)
t.NodeID = NullNodeID
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
t.updateAndSaveTaskMeta(setNodeID(NullNodeID))
got := t.Process()
s.False(got)
s.Equal(datapb.CompactionTaskState_pipelining, t.State)
s.EqualValues(NullNodeID, t.NodeID)
s.Equal(datapb.CompactionTaskState_pipelining, t.GetTaskProto().State)
s.EqualValues(NullNodeID, t.GetTaskProto().NodeID)
})
s.Run("test pipelining BuildCompactionRequest failed", func() {
s.mockAlloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil)
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining)
t.NodeID = 100
t.updateAndSaveTaskMeta(setNodeID(100))
channel := "ch-1"
deltaLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)}
@ -260,19 +249,19 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() {
Deltalogs: deltaLogs,
}}
}).Twice()
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Times(2)
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).Return()
s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil).Once()
got := t.Process()
s.True(got)
s.Equal(datapb.CompactionTaskState_cleaned, t.State)
s.Equal(datapb.CompactionTaskState_cleaned, t.GetTaskProto().State)
})
s.Run("test pipelining saveTaskMeta failed", func() {
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Once()
t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining)
s.mockAlloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil)
t.NodeID = 100
t.updateAndSaveTaskMeta(setNodeID(100))
channel := "ch-1"
deltaLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)}
@ -298,13 +287,14 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() {
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(errors.New("mock error")).Once()
got := t.Process()
s.False(got)
s.Equal(datapb.CompactionTaskState_pipelining, t.State)
s.Equal(datapb.CompactionTaskState_pipelining, t.GetTaskProto().State)
})
s.Run("test pipelining Compaction failed", func() {
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
s.mockAlloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil)
t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining)
t.NodeID = 100
t.updateAndSaveTaskMeta(setNodeID(100))
channel := "ch-1"
deltaLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)}
@ -330,20 +320,21 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() {
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
s.mockSessMgr.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, nodeID int64, plan *datapb.CompactionPlan) error {
s.Require().EqualValues(t.NodeID, nodeID)
s.Require().EqualValues(t.GetTaskProto().NodeID, nodeID)
return errors.New("mock error")
})
got := t.Process()
s.False(got)
s.Equal(datapb.CompactionTaskState_pipelining, t.State)
s.EqualValues(NullNodeID, t.NodeID)
s.Equal(datapb.CompactionTaskState_pipelining, t.GetTaskProto().State)
s.EqualValues(NullNodeID, t.GetTaskProto().NodeID)
})
s.Run("test pipelining success", func() {
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
s.mockAlloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil)
t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining)
t.NodeID = 100
t.updateAndSaveTaskMeta(setNodeID(100))
channel := "ch-1"
deltaLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)}
@ -366,56 +357,57 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() {
Deltalogs: deltaLogs,
}}
}).Twice()
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Once()
s.mockSessMgr.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, nodeID int64, plan *datapb.CompactionPlan) error {
s.Require().EqualValues(t.NodeID, nodeID)
s.Require().EqualValues(t.GetTaskProto().NodeID, nodeID)
return nil
})
got := t.Process()
s.False(got)
s.Equal(datapb.CompactionTaskState_executing, t.GetState())
s.Equal(datapb.CompactionTaskState_executing, t.GetTaskProto().GetState())
})
// stay in executing state when GetCompactionPlanResults error except ErrNodeNotFound
s.Run("test executing GetCompactionPlanResult fail NodeNotFound", func() {
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
t := s.generateTestL0Task(datapb.CompactionTaskState_executing)
t.NodeID = 100
s.Require().True(t.GetNodeID() > 0)
t.updateAndSaveTaskMeta(setNodeID(100))
s.Require().True(t.GetTaskProto().GetNodeID() > 0)
s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.NodeID, mock.Anything).Return(nil, merr.WrapErrNodeNotFound(t.NodeID)).Once()
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Once()
s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.GetTaskProto().NodeID, mock.Anything).Return(nil, merr.WrapErrNodeNotFound(t.GetTaskProto().NodeID)).Once()
got := t.Process()
s.False(got)
s.Equal(datapb.CompactionTaskState_pipelining, t.GetState())
s.EqualValues(NullNodeID, t.GetNodeID())
s.Equal(datapb.CompactionTaskState_pipelining, t.GetTaskProto().GetState())
s.EqualValues(NullNodeID, t.GetTaskProto().GetNodeID())
})
// stay in executing state when GetCompactionPlanResults error except ErrNodeNotFound
s.Run("test executing GetCompactionPlanResult fail mock error", func() {
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
t := s.generateTestL0Task(datapb.CompactionTaskState_executing)
t.NodeID = 100
s.Require().True(t.GetNodeID() > 0)
t.updateAndSaveTaskMeta(setNodeID(100))
s.Require().True(t.GetTaskProto().GetNodeID() > 0)
s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.NodeID, mock.Anything).Return(nil, errors.New("mock error")).Times(12)
s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.GetTaskProto().NodeID, mock.Anything).Return(nil, errors.New("mock error")).Times(12)
for i := 0; i < 12; i++ {
got := t.Process()
s.False(got)
s.Equal(datapb.CompactionTaskState_executing, t.GetState())
s.EqualValues(100, t.GetNodeID())
s.Equal(datapb.CompactionTaskState_executing, t.GetTaskProto().GetState())
s.EqualValues(100, t.GetTaskProto().GetNodeID())
}
})
s.Run("test executing with result executing", func() {
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
t := s.generateTestL0Task(datapb.CompactionTaskState_executing)
t.NodeID = 100
s.Require().True(t.GetNodeID() > 0)
t.updateAndSaveTaskMeta(setNodeID(100))
s.Require().True(t.GetTaskProto().GetNodeID() > 0)
s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.NodeID, mock.Anything).
s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.GetTaskProto().NodeID, mock.Anything).
Return(&datapb.CompactionPlanResult{
PlanID: t.GetPlanID(),
PlanID: t.GetTaskProto().GetPlanID(),
State: datapb.CompactionTaskState_executing,
}, nil).Twice()
@ -423,52 +415,52 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() {
s.False(got)
// test timeout
t.StartTime = time.Now().Add(-time.Hour).Unix()
t.TimeoutInSeconds = 10
t.updateAndSaveTaskMeta(setStartTime(time.Now().Add(-time.Hour).Unix()), setTimeoutInSeconds(10))
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).
RunAndReturn(func(inputs []int64, compacting bool) {
s.ElementsMatch(inputs, t.GetInputSegments())
s.ElementsMatch(inputs, t.GetTaskProto().GetInputSegments())
s.False(compacting)
}).Once()
got = t.Process()
s.True(got)
s.Equal(datapb.CompactionTaskState_cleaned, t.GetState())
s.Equal(datapb.CompactionTaskState_cleaned, t.GetTaskProto().GetState())
})
s.Run("test executing with result executing timeout and updataAndSaveTaskMeta failed", func() {
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Once()
t := s.generateTestL0Task(datapb.CompactionTaskState_executing)
t.NodeID = 100
s.Require().True(t.GetNodeID() > 0)
t.updateAndSaveTaskMeta(setNodeID(100))
s.Require().True(t.GetTaskProto().GetNodeID() > 0)
s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.NodeID, mock.Anything).
s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.GetTaskProto().NodeID, mock.Anything).
Return(&datapb.CompactionPlanResult{
PlanID: t.GetPlanID(),
PlanID: t.GetTaskProto().GetPlanID(),
State: datapb.CompactionTaskState_executing,
}, nil).Once()
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(errors.New("mock error")).Once()
t.StartTime = time.Now().Add(-time.Hour).Unix()
t.TimeoutInSeconds = 10
t.updateAndSaveTaskMeta(setStartTime(time.Now().Add(-time.Hour).Unix()), setTimeoutInSeconds(10))
got := t.Process()
s.False(got)
s.Equal(datapb.CompactionTaskState_executing, t.GetState())
s.Equal(datapb.CompactionTaskState_executing, t.GetTaskProto().GetState())
})
s.Run("test executing with result completed", func() {
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Once()
t := s.generateTestL0Task(datapb.CompactionTaskState_executing)
t.NodeID = 100
s.Require().True(t.GetNodeID() > 0)
t.updateAndSaveTaskMeta(setNodeID(100))
s.Require().True(t.GetTaskProto().GetNodeID() > 0)
s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.NodeID, mock.Anything).
s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.GetTaskProto().NodeID, mock.Anything).
Return(&datapb.CompactionPlanResult{
PlanID: t.GetPlanID(),
PlanID: t.GetTaskProto().GetPlanID(),
State: datapb.CompactionTaskState_completed,
}, nil).Once()
s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(nil)
s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetTaskProto().GetNodeID(), mock.Anything).Return(nil)
s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Times(2)
@ -476,16 +468,17 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() {
got := t.Process()
s.True(got)
s.Equal(datapb.CompactionTaskState_completed, t.GetState())
s.Equal(datapb.CompactionTaskState_completed, t.GetTaskProto().GetState())
})
s.Run("test executing with result completed save segment meta failed", func() {
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
t := s.generateTestL0Task(datapb.CompactionTaskState_executing)
t.NodeID = 100
s.Require().True(t.GetNodeID() > 0)
t.updateAndSaveTaskMeta(setNodeID(100))
s.Require().True(t.GetTaskProto().GetNodeID() > 0)
s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.NodeID, mock.Anything).
s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.GetTaskProto().NodeID, mock.Anything).
Return(&datapb.CompactionPlanResult{
PlanID: t.GetPlanID(),
PlanID: t.GetTaskProto().GetPlanID(),
State: datapb.CompactionTaskState_completed,
}, nil).Once()
@ -494,16 +487,17 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() {
got := t.Process()
s.False(got)
s.Equal(datapb.CompactionTaskState_executing, t.GetState())
s.Equal(datapb.CompactionTaskState_executing, t.GetTaskProto().GetState())
})
s.Run("test executing with result completed save compaction meta failed", func() {
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Once()
t := s.generateTestL0Task(datapb.CompactionTaskState_executing)
t.NodeID = 100
s.Require().True(t.GetNodeID() > 0)
t.updateAndSaveTaskMeta(setNodeID(100))
s.Require().True(t.GetTaskProto().GetNodeID() > 0)
s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.NodeID, mock.Anything).
s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.GetTaskProto().NodeID, mock.Anything).
Return(&datapb.CompactionPlanResult{
PlanID: t.GetPlanID(),
PlanID: t.GetTaskProto().GetPlanID(),
State: datapb.CompactionTaskState_completed,
}, nil).Once()
@ -512,53 +506,53 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() {
got := t.Process()
s.False(got)
s.Equal(datapb.CompactionTaskState_executing, t.GetState())
s.Equal(datapb.CompactionTaskState_executing, t.GetTaskProto().GetState())
})
s.Run("test executing with result failed", func() {
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
t := s.generateTestL0Task(datapb.CompactionTaskState_executing)
t.NodeID = 100
s.Require().True(t.GetNodeID() > 0)
t.updateAndSaveTaskMeta(setNodeID(100))
s.Require().True(t.GetTaskProto().GetNodeID() > 0)
s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.NodeID, mock.Anything).
s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.GetTaskProto().NodeID, mock.Anything).
Return(&datapb.CompactionPlanResult{
PlanID: t.GetPlanID(),
PlanID: t.GetTaskProto().GetPlanID(),
State: datapb.CompactionTaskState_failed,
}, nil).Once()
s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(nil)
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Times(2)
s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetTaskProto().GetNodeID(), mock.Anything).Return(nil)
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).Return().Once()
got := t.Process()
s.True(got)
s.Equal(datapb.CompactionTaskState_cleaned, t.GetState())
s.Equal(datapb.CompactionTaskState_cleaned, t.GetTaskProto().GetState())
})
s.Run("test executing with result failed save compaction meta failed", func() {
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Once()
t := s.generateTestL0Task(datapb.CompactionTaskState_executing)
t.NodeID = 100
s.Require().True(t.GetNodeID() > 0)
t.updateAndSaveTaskMeta(setNodeID(100))
s.Require().True(t.GetTaskProto().GetNodeID() > 0)
s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.NodeID, mock.Anything).
s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.GetTaskProto().NodeID, mock.Anything).
Return(&datapb.CompactionPlanResult{
PlanID: t.GetPlanID(),
PlanID: t.GetTaskProto().GetPlanID(),
State: datapb.CompactionTaskState_failed,
}, nil).Once()
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(errors.New("mock error")).Once()
got := t.Process()
s.False(got)
s.Equal(datapb.CompactionTaskState_executing, t.GetState())
s.Equal(datapb.CompactionTaskState_executing, t.GetTaskProto().GetState())
})
s.Run("test timeout", func() {
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
t := s.generateTestL0Task(datapb.CompactionTaskState_timeout)
t.NodeID = 100
s.Require().True(t.GetNodeID() > 0)
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Times(1)
t.updateAndSaveTaskMeta(setNodeID(100))
s.Require().True(t.GetTaskProto().GetNodeID() > 0)
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) {
s.Require().False(isCompacting)
s.ElementsMatch(segIDs, t.GetInputSegments())
s.ElementsMatch(segIDs, t.GetTaskProto().GetInputSegments())
}).Once()
got := t.Process()
@ -566,93 +560,96 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() {
})
s.Run("test metaSaved success", func() {
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
t := s.generateTestL0Task(datapb.CompactionTaskState_meta_saved)
t.NodeID = 100
s.Require().True(t.GetNodeID() > 0)
t.updateAndSaveTaskMeta(setNodeID(100))
s.Require().True(t.GetTaskProto().GetNodeID() > 0)
t.result = &datapb.CompactionPlanResult{}
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Once()
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) {
s.ElementsMatch(segIDs, t.GetInputSegments())
s.ElementsMatch(segIDs, t.GetTaskProto().GetInputSegments())
}).Once()
s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(nil).Once()
s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetTaskProto().GetNodeID(), mock.Anything).Return(nil).Once()
got := t.Process()
s.True(got)
s.Equal(datapb.CompactionTaskState_completed, t.GetState())
s.Equal(datapb.CompactionTaskState_completed, t.GetTaskProto().GetState())
})
s.Run("test metaSaved failed", func() {
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Once()
t := s.generateTestL0Task(datapb.CompactionTaskState_meta_saved)
t.NodeID = 100
s.Require().True(t.GetNodeID() > 0)
t.updateAndSaveTaskMeta(setNodeID(100))
s.Require().True(t.GetTaskProto().GetNodeID() > 0)
t.result = &datapb.CompactionPlanResult{}
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(errors.New("mock error")).Once()
got := t.Process()
s.False(got)
s.Equal(datapb.CompactionTaskState_meta_saved, t.GetState())
s.Equal(datapb.CompactionTaskState_meta_saved, t.GetTaskProto().GetState())
})
s.Run("test complete drop failed", func() {
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
t := s.generateTestL0Task(datapb.CompactionTaskState_completed)
t.NodeID = 100
s.Require().True(t.GetNodeID() > 0)
t.updateAndSaveTaskMeta(setNodeID(100))
s.Require().True(t.GetTaskProto().GetNodeID() > 0)
t.result = &datapb.CompactionPlanResult{}
s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(errors.New("mock error")).Once()
s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetTaskProto().GetNodeID(), mock.Anything).Return(errors.New("mock error")).Once()
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) {
s.ElementsMatch(segIDs, t.GetInputSegments())
s.ElementsMatch(segIDs, t.GetTaskProto().GetInputSegments())
}).Once()
got := t.Process()
s.True(got)
s.Equal(datapb.CompactionTaskState_completed, t.GetState())
s.Equal(datapb.CompactionTaskState_completed, t.GetTaskProto().GetState())
})
s.Run("test complete success", func() {
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
t := s.generateTestL0Task(datapb.CompactionTaskState_completed)
t.NodeID = 100
s.Require().True(t.GetNodeID() > 0)
t.updateAndSaveTaskMeta(setNodeID(100))
s.Require().True(t.GetTaskProto().GetNodeID() > 0)
t.result = &datapb.CompactionPlanResult{}
s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(nil).Once()
s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetTaskProto().GetNodeID(), mock.Anything).Return(nil).Once()
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) {
s.ElementsMatch(segIDs, t.GetInputSegments())
s.ElementsMatch(segIDs, t.GetTaskProto().GetInputSegments())
}).Once()
got := t.Process()
s.True(got)
s.Equal(datapb.CompactionTaskState_completed, t.GetState())
s.Equal(datapb.CompactionTaskState_completed, t.GetTaskProto().GetState())
})
s.Run("test process failed success", func() {
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
t := s.generateTestL0Task(datapb.CompactionTaskState_failed)
t.NodeID = 100
s.Require().True(t.GetNodeID() > 0)
s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(nil).Once()
t.updateAndSaveTaskMeta(setNodeID(100))
s.Require().True(t.GetTaskProto().GetNodeID() > 0)
s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetTaskProto().GetNodeID(), mock.Anything).Return(nil).Once()
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) {
s.ElementsMatch(segIDs, t.GetInputSegments())
s.ElementsMatch(segIDs, t.GetTaskProto().GetInputSegments())
}).Once()
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Times(1)
got := t.Process()
s.True(got)
s.Equal(datapb.CompactionTaskState_cleaned, t.GetState())
s.Equal(datapb.CompactionTaskState_cleaned, t.GetTaskProto().GetState())
})
s.Run("test process failed failed", func() {
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
t := s.generateTestL0Task(datapb.CompactionTaskState_failed)
t.NodeID = 100
s.Require().True(t.GetNodeID() > 0)
s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(errors.New("mock error")).Once()
t.updateAndSaveTaskMeta(setNodeID(100))
s.Require().True(t.GetTaskProto().GetNodeID() > 0)
s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetTaskProto().GetNodeID(), mock.Anything).Return(errors.New("mock error")).Once()
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) {
s.ElementsMatch(segIDs, t.GetInputSegments())
s.ElementsMatch(segIDs, t.GetTaskProto().GetInputSegments())
}).Once()
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Times(1)
got := t.Process()
s.True(got)
s.Equal(datapb.CompactionTaskState_cleaned, t.GetState())
s.Equal(datapb.CompactionTaskState_cleaned, t.GetTaskProto().GetState())
})
s.Run("test unknown task", func() {
@ -664,6 +661,7 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() {
}
func (s *L0CompactionTaskSuite) TestSetterGetter() {
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining)
span := t.GetSpan()
@ -681,8 +679,8 @@ func (s *L0CompactionTaskSuite) TestSetterGetter() {
label := t.GetLabel()
s.Equal("10-ch-1", label)
t.SetStartTime(100)
s.EqualValues(100, t.GetStartTime())
t.updateAndSaveTaskMeta(setStartTime(100))
s.EqualValues(100, t.GetTaskProto().GetStartTime())
t.SetTask(nil)
t.SetPlan(&datapb.CompactionPlan{PlanID: 19530})
@ -693,52 +691,6 @@ func (s *L0CompactionTaskSuite) TestSetterGetter() {
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
t.SetNodeID(1000)
s.EqualValues(1000, t.GetNodeID())
})
}
func (s *L0CompactionTaskSuite) TestCleanLogPath() {
s.Run("plan nil", func() {
t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining)
t.CleanLogPath()
})
s.Run("clear path", func() {
t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining)
t.SetPlan(&datapb.CompactionPlan{
Channel: "ch-1",
Type: datapb.CompactionType_MixCompaction,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{
SegmentID: 100,
FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(101, 4)},
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(101, 5)},
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(101, 6)},
},
},
PlanID: 19530,
})
t.SetResult(&datapb.CompactionPlanResult{
Segments: []*datapb.CompactionSegment{
{
SegmentID: 100,
InsertLogs: []*datapb.FieldBinlog{getFieldBinlogIDs(101, 4)},
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(101, 5)},
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(101, 6)},
},
},
PlanID: 19530,
})
t.CleanLogPath()
s.Empty(t.GetPlan().GetSegmentBinlogs()[0].GetFieldBinlogs())
s.Empty(t.GetPlan().GetSegmentBinlogs()[0].GetField2StatslogPaths())
s.Empty(t.GetPlan().GetSegmentBinlogs()[0].GetDeltalogs())
s.Empty(t.GetResult().GetSegments()[0].GetInsertLogs())
s.Empty(t.GetResult().GetSegments()[0].GetField2StatslogPaths())
s.Empty(t.GetResult().GetSegments()[0].GetDeltalogs())
s.EqualValues(1000, t.GetTaskProto().GetNodeID())
})
}

View File

@ -8,6 +8,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
@ -22,9 +23,9 @@ import (
var _ CompactionTask = (*mixCompactionTask)(nil)
type mixCompactionTask struct {
*datapb.CompactionTask
plan *datapb.CompactionPlan
result *datapb.CompactionPlanResult
taskProto atomic.Value // *datapb.CompactionTask
plan *datapb.CompactionPlan
result *datapb.CompactionPlanResult
span trace.Span
allocator allocator.Allocator
@ -34,18 +35,27 @@ type mixCompactionTask struct {
slotUsage int64
}
func newMixCompactionTask(t *datapb.CompactionTask, allocator allocator.Allocator, meta CompactionMeta, session session.DataNodeManager) *mixCompactionTask {
return &mixCompactionTask{
CompactionTask: t,
allocator: allocator,
meta: meta,
sessions: session,
slotUsage: paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64(),
func (t *mixCompactionTask) GetTaskProto() *datapb.CompactionTask {
task := t.taskProto.Load()
if task == nil {
return nil
}
return task.(*datapb.CompactionTask)
}
func newMixCompactionTask(t *datapb.CompactionTask, allocator allocator.Allocator, meta CompactionMeta, session session.DataNodeManager) *mixCompactionTask {
task := &mixCompactionTask{
allocator: allocator,
meta: meta,
sessions: session,
slotUsage: paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64(),
}
task.taskProto.Store(t)
return task
}
func (t *mixCompactionTask) processPipelining() bool {
log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID()), zap.Int64("nodeID", t.GetNodeID()))
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()), zap.Int64("nodeID", t.GetTaskProto().GetNodeID()))
if t.NeedReAssignNodeID() {
log.Info("mixCompactionTask need assign nodeID")
return false
@ -63,7 +73,7 @@ func (t *mixCompactionTask) processPipelining() bool {
return t.processFailed()
}
err = t.sessions.Compaction(context.TODO(), t.GetNodeID(), t.GetPlan())
err = t.sessions.Compaction(context.TODO(), t.GetTaskProto().GetNodeID(), t.GetPlan())
if err != nil {
log.Warn("mixCompactionTask failed to notify compaction tasks to DataNode", zap.Error(err))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
@ -80,7 +90,7 @@ func (t *mixCompactionTask) processPipelining() bool {
}
func (t *mixCompactionTask) processMetaSaved() bool {
log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID()))
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)); err != nil {
log.Warn("mixCompactionTask failed to proccessMetaSaved", zap.Error(err))
return false
@ -90,8 +100,8 @@ func (t *mixCompactionTask) processMetaSaved() bool {
}
func (t *mixCompactionTask) processExecuting() bool {
log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID()))
result, err := t.sessions.GetCompactionPlanResult(t.GetNodeID(), t.GetPlanID())
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
result, err := t.sessions.GetCompactionPlanResult(t.GetTaskProto().GetNodeID(), t.GetTaskProto().GetPlanID())
if err != nil || result == nil {
if errors.Is(err, merr.ErrNodeNotFound) {
if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)); err != nil {
@ -104,7 +114,7 @@ func (t *mixCompactionTask) processExecuting() bool {
switch result.GetState() {
case datapb.CompactionTaskState_executing:
if t.checkTimeout() {
log.Info("mixCompactionTask timeout", zap.Int32("timeout in seconds", t.GetTimeoutInSeconds()), zap.Int64("startTime", t.GetStartTime()))
log.Info("mixCompactionTask timeout", zap.Int32("timeout in seconds", t.GetTaskProto().GetTimeoutInSeconds()), zap.Int64("startTime", t.GetTaskProto().GetStartTime()))
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout))
if err != nil {
log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err))
@ -157,13 +167,13 @@ func (t *mixCompactionTask) saveTaskMeta(task *datapb.CompactionTask) error {
}
func (t *mixCompactionTask) SaveTaskMeta() error {
return t.saveTaskMeta(t.CompactionTask)
return t.saveTaskMeta(t.GetTaskProto())
}
func (t *mixCompactionTask) saveSegmentMeta() error {
log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID()))
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
// Also prepare metric updates.
newSegments, metricMutation, err := t.meta.CompleteCompactionMutation(t.CompactionTask, t.result)
newSegments, metricMutation, err := t.meta.CompleteCompactionMutation(t.taskProto.Load().(*datapb.CompactionTask), t.result)
if err != nil {
return err
}
@ -177,10 +187,10 @@ func (t *mixCompactionTask) saveSegmentMeta() error {
// Note: return True means exit this state machine.
// ONLY return True for processCompleted or processFailed
func (t *mixCompactionTask) Process() bool {
log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID()))
lastState := t.GetState().String()
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
lastState := t.GetTaskProto().GetState().String()
processResult := true
switch t.GetState() {
switch t.GetTaskProto().GetState() {
case datapb.CompactionTaskState_pipelining:
processResult = t.processPipelining()
case datapb.CompactionTaskState_executing:
@ -194,13 +204,17 @@ func (t *mixCompactionTask) Process() bool {
case datapb.CompactionTaskState_failed:
processResult = t.processFailed()
}
currentState := t.GetState().String()
currentState := t.GetTaskProto().GetState().String()
if currentState != lastState {
log.Info("mix compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState))
}
return processResult
}
func (t *mixCompactionTask) SetPlan(plan *datapb.CompactionPlan) {
t.plan = plan
}
func (t *mixCompactionTask) GetResult() *datapb.CompactionPlanResult {
return t.result
}
@ -210,17 +224,17 @@ func (t *mixCompactionTask) GetPlan() *datapb.CompactionPlan {
}
func (t *mixCompactionTask) GetLabel() string {
return fmt.Sprintf("%d-%s", t.PartitionID, t.GetChannel())
return fmt.Sprintf("%d-%s", t.taskProto.Load().(*datapb.CompactionTask).PartitionID, t.GetTaskProto().GetChannel())
}
func (t *mixCompactionTask) NeedReAssignNodeID() bool {
return t.GetState() == datapb.CompactionTaskState_pipelining && (t.GetNodeID() == 0 || t.GetNodeID() == NullNodeID)
return t.GetTaskProto().GetState() == datapb.CompactionTaskState_pipelining && (t.GetTaskProto().GetNodeID() == 0 || t.GetTaskProto().GetNodeID() == NullNodeID)
}
func (t *mixCompactionTask) processCompleted() bool {
log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID()))
if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetPlanID(),
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
if err := t.sessions.DropCompactionPlan(t.GetTaskProto().GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetTaskProto().GetPlanID(),
}); err != nil {
log.Warn("mixCompactionTask processCompleted unable to drop compaction plan")
}
@ -233,7 +247,7 @@ func (t *mixCompactionTask) processCompleted() bool {
}
func (t *mixCompactionTask) resetSegmentCompacting() {
t.meta.SetSegmentsCompacting(t.GetInputSegments(), false)
t.meta.SetSegmentsCompacting(t.taskProto.Load().(*datapb.CompactionTask).GetInputSegments(), false)
}
func (t *mixCompactionTask) processTimeout() bool {
@ -247,7 +261,7 @@ func (t *mixCompactionTask) processTimeout() bool {
}
func (t *mixCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.CompactionTask {
taskClone := proto.Clone(t).(*datapb.CompactionTask)
taskClone := proto.Clone(t.GetTaskProto()).(*datapb.CompactionTask)
for _, opt := range opts {
opt(taskClone)
}
@ -255,9 +269,9 @@ func (t *mixCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.Compa
}
func (t *mixCompactionTask) processFailed() bool {
log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID()))
if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetPlanID(),
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
if err := t.sessions.DropCompactionPlan(t.GetTaskProto().GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetTaskProto().GetPlanID(),
}); err != nil {
log.Warn("mixCompactionTask processFailed unable to drop compaction plan", zap.Error(err))
}
@ -273,9 +287,9 @@ func (t *mixCompactionTask) processFailed() bool {
}
func (t *mixCompactionTask) checkTimeout() bool {
if t.GetTimeoutInSeconds() > 0 {
diff := time.Since(time.Unix(t.GetStartTime(), 0)).Seconds()
if diff > float64(t.GetTimeoutInSeconds()) {
if t.GetTaskProto().GetTimeoutInSeconds() > 0 {
diff := time.Since(time.Unix(t.GetTaskProto().GetStartTime(), 0)).Seconds()
if diff > float64(t.GetTaskProto().GetTimeoutInSeconds()) {
return true
}
}
@ -288,7 +302,7 @@ func (t *mixCompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) err
if err != nil {
return err
}
t.CompactionTask = task
t.SetTask(task)
return nil
}
@ -301,66 +315,37 @@ func (t *mixCompactionTask) GetSpan() trace.Span {
}
func (t *mixCompactionTask) SetTask(task *datapb.CompactionTask) {
t.CompactionTask = task
t.taskProto.Store(task)
}
func (t *mixCompactionTask) SetSpan(span trace.Span) {
t.span = span
}
func (t *mixCompactionTask) SetResult(result *datapb.CompactionPlanResult) {
t.result = result
}
func (t *mixCompactionTask) EndSpan() {
if t.span != nil {
t.span.End()
}
}
func (t *mixCompactionTask) CleanLogPath() {
if t.plan == nil {
return
}
if t.plan.GetSegmentBinlogs() != nil {
for _, binlogs := range t.plan.GetSegmentBinlogs() {
binlogs.FieldBinlogs = nil
binlogs.Field2StatslogPaths = nil
binlogs.Deltalogs = nil
}
}
if t.result.GetSegments() != nil {
for _, segment := range t.result.GetSegments() {
segment.InsertLogs = nil
segment.Deltalogs = nil
segment.Field2StatslogPaths = nil
}
}
}
func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) {
log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID()))
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
beginLogID, _, err := t.allocator.AllocN(1)
if err != nil {
return nil, err
}
taskProto := t.taskProto.Load().(*datapb.CompactionTask)
plan := &datapb.CompactionPlan{
PlanID: t.GetPlanID(),
StartTime: t.GetStartTime(),
TimeoutInSeconds: t.GetTimeoutInSeconds(),
Type: t.GetType(),
Channel: t.GetChannel(),
CollectionTtl: t.GetCollectionTtl(),
TotalRows: t.GetTotalRows(),
Schema: t.GetSchema(),
PlanID: taskProto.GetPlanID(),
StartTime: taskProto.GetStartTime(),
TimeoutInSeconds: taskProto.GetTimeoutInSeconds(),
Type: taskProto.GetType(),
Channel: taskProto.GetChannel(),
CollectionTtl: taskProto.GetCollectionTtl(),
TotalRows: taskProto.GetTotalRows(),
Schema: taskProto.GetSchema(),
BeginLogID: beginLogID,
PreAllocatedSegmentIDs: t.GetPreAllocatedSegmentIDs(),
PreAllocatedSegmentIDs: taskProto.GetPreAllocatedSegmentIDs(),
SlotUsage: t.GetSlotUsage(),
MaxSize: t.GetMaxSize(),
MaxSize: taskProto.GetMaxSize(),
}
segIDMap := make(map[int64][]*datapb.FieldBinlog, len(plan.SegmentBinlogs))
for _, segID := range t.GetInputSegments() {
for _, segID := range taskProto.GetInputSegments() {
segInfo := t.meta.GetHealthySegment(segID)
if segInfo == nil {
return nil, merr.WrapErrSegmentNotFound(segID)

View File

@ -1,18 +1,37 @@
package datacoord
import (
"testing"
"time"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/internal/datacoord/session"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/merr"
)
func (s *CompactionTaskSuite) TestProcessRefreshPlan_NormalMix() {
func TestMixCompactionTaskSuite(t *testing.T) {
suite.Run(t, new(MixCompactionTaskSuite))
}
type MixCompactionTaskSuite struct {
suite.Suite
mockMeta *MockCompactionMeta
mockSessMgr *session.MockDataNodeManager
}
func (s *MixCompactionTaskSuite) SetupTest() {
s.mockMeta = NewMockCompactionMeta(s.T())
s.mockSessMgr = session.NewMockDataNodeManager(s.T())
}
func (s *MixCompactionTaskSuite) TestProcessRefreshPlan_NormalMix() {
channel := "Ch-1"
binLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)}
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo {
@ -24,21 +43,17 @@ func (s *CompactionTaskSuite) TestProcessRefreshPlan_NormalMix() {
Binlogs: binLogs,
}}
}).Times(2)
task := &mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
PlanID: 1,
TriggerID: 19530,
CollectionID: 1,
PartitionID: 10,
Type: datapb.CompactionType_MixCompaction,
NodeID: 1,
State: datapb.CompactionTaskState_executing,
InputSegments: []int64{200, 201},
ResultSegments: []int64{100, 200},
},
// plan: plan,
meta: s.mockMeta,
}
task := newMixCompactionTask(&datapb.CompactionTask{
PlanID: 1,
TriggerID: 19530,
CollectionID: 1,
PartitionID: 10,
Type: datapb.CompactionType_MixCompaction,
NodeID: 1,
State: datapb.CompactionTaskState_executing,
InputSegments: []int64{200, 201},
ResultSegments: []int64{100, 200},
}, nil, s.mockMeta, nil)
alloc := allocator.NewMockAllocator(s.T())
alloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil)
task.allocator = alloc
@ -52,27 +67,24 @@ func (s *CompactionTaskSuite) TestProcessRefreshPlan_NormalMix() {
s.ElementsMatch([]int64{200, 201}, segIDs)
}
func (s *CompactionTaskSuite) TestProcessRefreshPlan_MixSegmentNotFound() {
func (s *MixCompactionTaskSuite) TestProcessRefreshPlan_MixSegmentNotFound() {
channel := "Ch-1"
s.Run("segment_not_found", func() {
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo {
return nil
}).Once()
task := &mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
PlanID: 1,
TriggerID: 19530,
CollectionID: 1,
PartitionID: 10,
Channel: channel,
Type: datapb.CompactionType_MixCompaction,
State: datapb.CompactionTaskState_executing,
NodeID: 1,
InputSegments: []int64{200, 201},
ResultSegments: []int64{100, 200},
},
meta: s.mockMeta,
}
task := newMixCompactionTask(&datapb.CompactionTask{
PlanID: 1,
TriggerID: 19530,
CollectionID: 1,
PartitionID: 10,
Channel: channel,
Type: datapb.CompactionType_MixCompaction,
State: datapb.CompactionTaskState_executing,
NodeID: 1,
InputSegments: []int64{200, 201},
ResultSegments: []int64{100, 200},
}, nil, s.mockMeta, nil)
alloc := allocator.NewMockAllocator(s.T())
alloc.EXPECT().AllocN(int64(1)).Return(19530, 99999, nil)
task.allocator = alloc
@ -82,7 +94,7 @@ func (s *CompactionTaskSuite) TestProcessRefreshPlan_MixSegmentNotFound() {
})
}
func (s *CompactionTaskSuite) TestCompactionTimeout() {
func (s *MixCompactionTaskSuite) TestCompactionTimeout() {
channel := "Ch-1"
binLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)}
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo {
@ -98,23 +110,18 @@ func (s *CompactionTaskSuite) TestCompactionTimeout() {
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything)
alloc := allocator.NewMockAllocator(s.T())
alloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil)
task := &mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
PlanID: 1,
TriggerID: 19530,
CollectionID: 1,
PartitionID: 10,
Type: datapb.CompactionType_MixCompaction,
NodeID: 1,
State: datapb.CompactionTaskState_executing,
InputSegments: []int64{200, 201},
ResultSegments: []int64{100, 200},
TimeoutInSeconds: 1,
},
meta: s.mockMeta,
sessions: s.mockSessMgr,
allocator: alloc,
}
task := newMixCompactionTask(&datapb.CompactionTask{
PlanID: 1,
TriggerID: 19530,
CollectionID: 1,
PartitionID: 10,
Type: datapb.CompactionType_MixCompaction,
NodeID: 1,
State: datapb.CompactionTaskState_executing,
InputSegments: []int64{200, 201},
ResultSegments: []int64{100, 200},
TimeoutInSeconds: 1,
}, alloc, s.mockMeta, s.mockSessMgr)
plan, err := task.BuildCompactionRequest()
task.plan = plan
s.Require().NoError(err)
@ -125,5 +132,5 @@ func (s *CompactionTaskSuite) TestCompactionTimeout() {
}, nil)
end := task.processExecuting()
s.Equal(true, end)
s.Equal(datapb.CompactionTaskState_cleaned, task.State)
s.Equal(datapb.CompactionTaskState_cleaned, task.GetTaskProto().State)
}

View File

@ -1,25 +0,0 @@
package datacoord
import (
"testing"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/internal/datacoord/session"
)
func TestCompactionTaskSuite(t *testing.T) {
suite.Run(t, new(CompactionTaskSuite))
}
type CompactionTaskSuite struct {
suite.Suite
mockMeta *MockCompactionMeta
mockSessMgr *session.MockDataNodeManager
}
func (s *CompactionTaskSuite) SetupTest() {
s.mockMeta = NewMockCompactionMeta(s.T())
s.mockSessMgr = session.NewMockDataNodeManager(s.T())
}

File diff suppressed because it is too large Load Diff

View File

@ -64,10 +64,7 @@ func (h *spyCompactionHandler) removeTasksByChannel(channel string) {}
// enqueueCompaction start to execute plan and return immediately
func (h *spyCompactionHandler) enqueueCompaction(task *datapb.CompactionTask) error {
t := &mixCompactionTask{
CompactionTask: task,
meta: h.meta,
}
t := newMixCompactionTask(task, nil, h.meta, nil)
alloc := newMock0Allocator(h.t)
t.allocator = alloc
plan, err := t.BuildCompactionRequest()
@ -587,6 +584,7 @@ func Test_compactionTrigger_force(t *testing.T) {
Schema: schema,
PreAllocatedSegmentIDs: &datapb.IDRange{Begin: 101, End: 200},
MaxSize: 1342177280,
SlotUsage: paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64(),
},
},
},