fix: [cherry-pick]Skip changing meta if nodeID not match with channel (#31672)

See also: #31648
pr: #31665, #31694

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/31062/head
XuanYang-cn 2024-04-10 15:09:18 +08:00 committed by GitHub
parent 90bed1caf9
commit aad3ed3835
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 74 additions and 23 deletions

View File

@ -540,14 +540,36 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
// TODO reduce the lock range
c.mu.Lock()
for _, task := range executingTasks {
log := log.With(zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID))
log := log.With(
zap.Int64("planID", task.plan.PlanID),
zap.Int64("nodeID", task.dataNodeID),
zap.String("channel", task.plan.GetChannel()))
planID := task.plan.PlanID
cachedPlans = append(cachedPlans, planID)
if nodePlan, ok := planStates[planID]; ok {
planResult := nodePlan.B
switch planResult.GetState() {
case commonpb.CompactionState_Completed:
log.Info("start to complete compaction")
// channels are balanced to other nodes, yet the old datanode still have the compaction results
// task.dataNodeID == planState.A, but
// task.dataNodeID not match with channel
// Mark this compaction as failure and skip processing the meta
if !c.chManager.Match(task.dataNodeID, task.plan.GetChannel()) {
// Sync segments without CompactionFrom segmentsIDs to make sure DN clear the task
// without changing the meta
log.Warn("compaction failed for channel nodeID not match")
if err := c.sessions.SyncSegments(task.dataNodeID, &datapb.SyncSegmentsRequest{PlanID: planID}); err != nil {
log.Warn("compaction failed to sync segments with node", zap.Error(err))
continue
}
c.plans[planID] = c.plans[planID].shadowClone(setState(failed), endSpan())
c.setSegmentsCompacting(task.plan, false)
c.scheduler.Finish(task.dataNodeID, task.plan)
}
if err := c.completeCompaction(planResult); err != nil {
log.Warn("fail to complete compaction", zap.Error(err))
}
@ -574,7 +596,12 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
// Timeout tasks will be timeout and failed in DataNode
// need to wait for DataNode reporting failure and clean the status.
for _, task := range timeoutTasks {
log := log.With(zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID))
log := log.With(
zap.Int64("planID", task.plan.PlanID),
zap.Int64("nodeID", task.dataNodeID),
zap.String("channel", task.plan.GetChannel()),
)
planID := task.plan.PlanID
cachedPlans = append(cachedPlans, planID)
if nodePlan, ok := planStates[planID]; ok {
@ -601,17 +628,15 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
for _, planID := range unkonwnPlansInWorker {
if nodeUnkonwnPlan, ok := completedPlans[planID]; ok {
nodeID, plan := nodeUnkonwnPlan.A, nodeUnkonwnPlan.B
log := log.With(zap.Int64("planID", planID), zap.Int64("nodeID", nodeID))
log := log.With(zap.Int64("planID", planID), zap.Int64("nodeID", nodeID), zap.String("channel", plan.GetChannel()))
// Sync segments without CompactionFrom segmentsIDs to make sure DN clear the task
// without changing the meta
req := &datapb.SyncSegmentsRequest{
log.Info("compaction syncing unknown plan with node")
if err := c.sessions.SyncSegments(nodeID, &datapb.SyncSegmentsRequest{
PlanID: planID,
ChannelName: plan.GetChannel(),
}
log.Info("compaction syncing unknown plan with node")
if err := c.sessions.SyncSegments(nodeID, req); err != nil {
}); err != nil {
log.Warn("compaction failed to sync segments with node", zap.Error(err))
return err
}

View File

@ -687,33 +687,40 @@ func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() {
2: {A: 111, B: &datapb.CompactionPlanResult{PlanID: 2, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 2}}}},
3: {A: 111, B: &datapb.CompactionPlanResult{PlanID: 3, State: commonpb.CompactionState_Executing}},
5: {A: 222, B: &datapb.CompactionPlanResult{PlanID: 5, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 5}}}},
6: {A: 111, B: &datapb.CompactionPlanResult{Channel: "ch-2", PlanID: 5, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 6}}}},
}, nil)
inPlans := map[int64]*compactionTask{
1: {
triggerInfo: &compactionSignal{},
plan: &datapb.CompactionPlan{PlanID: 1},
plan: &datapb.CompactionPlan{PlanID: 1, Channel: "ch-1"},
state: executing,
dataNodeID: 111,
},
2: {
triggerInfo: &compactionSignal{},
plan: &datapb.CompactionPlan{PlanID: 2},
plan: &datapb.CompactionPlan{PlanID: 2, Channel: "ch-1"},
state: executing,
dataNodeID: 111,
},
3: {
triggerInfo: &compactionSignal{},
plan: &datapb.CompactionPlan{PlanID: 3},
plan: &datapb.CompactionPlan{PlanID: 3, Channel: "ch-1"},
state: timeout,
dataNodeID: 111,
},
4: {
triggerInfo: &compactionSignal{},
plan: &datapb.CompactionPlan{PlanID: 4},
plan: &datapb.CompactionPlan{PlanID: 4, Channel: "ch-1"},
state: timeout,
dataNodeID: 111,
},
6: {
triggerInfo: &compactionSignal{},
plan: &datapb.CompactionPlan{PlanID: 6, Channel: "ch-2"},
state: executing,
dataNodeID: 111,
},
}
s.mockSessMgr.EXPECT().SyncSegments(int64(222), mock.Anything).RunAndReturn(func(nodeID int64, req *datapb.SyncSegmentsRequest) error {
@ -723,6 +730,9 @@ func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() {
s.EqualValues(5, req.GetPlanID())
return nil
}).Once()
s.mockSessMgr.EXPECT().SyncSegments(int64(111), mock.Anything).Return(nil)
s.mockCm.EXPECT().Match(int64(111), "ch-1").Return(true)
s.mockCm.EXPECT().Match(int64(111), "ch-2").Return(false).Once()
handler := newCompactionPlanHandler(s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc)
handler.plans = inPlans
@ -744,6 +754,9 @@ func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() {
task = handler.plans[4]
s.Equal(failed, task.state)
task = handler.plans[6]
s.Equal(failed, task.state)
}
func getFieldBinlogIDs(fieldID int64, logIDs ...int64) *datapb.FieldBinlog {

View File

@ -18,6 +18,7 @@ package datanode
import (
"context"
"sync"
"github.com/samber/lo"
"go.uber.org/zap"
@ -38,6 +39,10 @@ type compactionExecutor struct {
completed *typeutil.ConcurrentMap[int64, *datapb.CompactionPlanResult] // planID to CompactionPlanResult
taskCh chan compactor
dropped *typeutil.ConcurrentSet[string] // vchannel dropped
// To prevent concurrency of release channel and compaction get results
// all released channel's compaction tasks will be discarded
resultGuard sync.RWMutex
}
func newCompactionExecutor() *compactionExecutor {
@ -127,10 +132,15 @@ func (c *compactionExecutor) isValidChannel(channel string) bool {
return !c.dropped.Contain(channel)
}
func (c *compactionExecutor) clearTasksByChannel(channel string) {
func (c *compactionExecutor) discardByDroppedChannel(channel string) {
c.dropped.Insert(channel)
c.discardPlan(channel)
}
func (c *compactionExecutor) discardPlan(channel string) {
c.resultGuard.Lock()
defer c.resultGuard.Unlock()
// stop executing tasks of channel
c.executing.Range(func(planID int64, task compactor) bool {
if task.getChannelName() == channel {
c.stopTask(planID)
@ -142,7 +152,7 @@ func (c *compactionExecutor) clearTasksByChannel(channel string) {
c.completed.Range(func(planID int64, result *datapb.CompactionPlanResult) bool {
if result.GetChannel() == channel {
c.injectDone(planID)
log.Info("remove compaction results for dropped channel",
log.Info("remove compaction plan and results",
zap.String("channel", channel),
zap.Int64("planID", planID))
}
@ -151,6 +161,8 @@ func (c *compactionExecutor) clearTasksByChannel(channel string) {
}
func (c *compactionExecutor) getAllCompactionResults() []*datapb.CompactionPlanResult {
c.resultGuard.RLock()
defer c.resultGuard.RUnlock()
var (
executing []int64
completed []int64

View File

@ -83,7 +83,7 @@ func TestCompactionExecutor(t *testing.T) {
{expected: false, channel: "ch2", desc: "in dropped"},
}
ex := newCompactionExecutor()
ex.clearTasksByChannel("ch2")
ex.discardByDroppedChannel("ch2")
for _, test := range tests {
t.Run(test.desc, func(t *testing.T) {
assert.Equal(t, test.expected, ex.isValidChannel(test.channel))
@ -107,7 +107,7 @@ func TestCompactionExecutor(t *testing.T) {
found = ex.executing.Contain(mc.getPlanID())
}
ex.clearTasksByChannel("mock")
ex.discardByDroppedChannel("mock")
select {
case <-mc.ctx.Done():

View File

@ -320,10 +320,11 @@ func (node *DataNode) handleChannelEvt(evt *clientv3.Event) {
}
// tryToReleaseFlowgraph tries to release a flowgraph
func (node *DataNode) tryToReleaseFlowgraph(vChanName string) {
log.Info("try to release flowgraph", zap.String("vChanName", vChanName))
node.flowgraphManager.RemoveFlowgraph(vChanName)
node.writeBufferManager.RemoveChannel(vChanName)
func (node *DataNode) tryToReleaseFlowgraph(channel string) {
log.Info("try to release flowgraph", zap.String("channel", channel))
node.compactionExecutor.discardPlan(channel)
node.flowgraphManager.RemoveFlowgraph(channel)
node.writeBufferManager.RemoveChannel(channel)
}
// BackGroundGC runs in background to release datanode resources

View File

@ -154,7 +154,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
ddn.dropMode.Store(true)
log.Info("Stop compaction of vChannel", zap.String("vChannelName", ddn.vChannelName))
ddn.compactionExecutor.clearTasksByChannel(ddn.vChannelName)
ddn.compactionExecutor.discardByDroppedChannel(ddn.vChannelName)
fgMsg.dropCollection = true
pChan := funcutil.ToPhysicalChannel(ddn.vChannelName)

View File

@ -313,7 +313,7 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
ds, ok := node.flowgraphManager.GetFlowgraphService(req.GetChannelName())
if !ok {
node.compactionExecutor.clearTasksByChannel(req.GetChannelName())
node.compactionExecutor.discardPlan(req.GetChannelName())
err := merr.WrapErrChannelNotFound(req.GetChannelName())
log.Warn("failed to sync segments", zap.Error(err))
return merr.Status(err), nil