Apply DropVirtualChannel and FlushManager drop mode (#12563)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/12628/head
congqixia 2021-12-02 16:39:33 +08:00 committed by GitHub
parent 487bf9483e
commit 8f9e62fa18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 466 additions and 214 deletions

View File

@ -131,7 +131,9 @@ func (c *ChannelManager) unwatchDroppedChannels() {
err := c.remove(nodeChannel.NodeID, ch)
if err != nil {
log.Warn("unable to remove channel", zap.String("channel", ch.Name), zap.Error(err))
continue
}
c.h.FinishDropChannel(ch.Name)
}
}
}

View File

@ -16,6 +16,7 @@ type Handler interface {
// GetVChanPositions gets the information recovery needed of a channel
GetVChanPositions(channel string, collectionID UniqueID, partitionID UniqueID) *datapb.VchannelInfo
CheckShouldDropChannel(channel string) bool
FinishDropChannel(channel string)
}
// Handler is a helper of Server
@ -131,18 +132,24 @@ func (h *ServerHandler) GetCollection(ctx context.Context, collectionID UniqueID
}
func (h *ServerHandler) CheckShouldDropChannel(channel string) bool {
segments := h.s.meta.GetSegmentsByChannel(channel)
for _, segment := range segments {
if segment.GetStartPosition() != nil && // filter empty segment
// FIXME: we filter compaction generated segments
// because datanode may not know the segment due to the network lag or
// datacoord crash when handling CompleteCompaction.
// FIXME: cancel this limitation for #12265
// need to change a unified DropAndFlush to solve the root problem
//len(segment.CompactionFrom) == 0 &&
segment.GetState() != commonpb.SegmentState_Dropped {
return false
/*
segments := h.s.meta.GetSegmentsByChannel(channel)
for _, segment := range segments {
if segment.GetStartPosition() != nil && // filter empty segment
// FIXME: we filter compaction generated segments
// because datanode may not know the segment due to the network lag or
// datacoord crash when handling CompleteCompaction.
// FIXME: cancel this limitation for #12265
// need to change a unified DropAndFlush to solve the root problem
//len(segment.CompactionFrom) == 0 &&
segment.GetState() != commonpb.SegmentState_Dropped {
return false
}
}
}
return true
return false*/
return h.s.meta.ChannelHasRemoveFlag(channel)
}
func (h *ServerHandler) FinishDropChannel(channel string) {
h.s.meta.FinishRemoveChannel(channel)
}

View File

@ -38,6 +38,8 @@ const (
segmentPrefix = metaPrefix + "/s"
channelRemovePrefix = metaPrefix + "/channel-removal"
handoffSegmentPrefix = "querycoord-handoff"
removeFlagTomestone = "removed"
)
type meta struct {
@ -460,7 +462,7 @@ func (m *meta) saveDropSegmentAndRemove(channel string, modSegments map[int64]*S
// add removal flag into meta, preventing non-atomic removal channel failure
removalFlag := buildChannelRemovePath(channel)
kv[removalFlag] = ""
kv[removalFlag] = removeFlagTomestone
}
err := m.saveKvTxn(kv)
@ -482,6 +484,16 @@ func (m *meta) FinishRemoveChannel(channel string) error {
return m.client.Remove(key)
}
// ChannelHasRemoveFlag
func (m *meta) ChannelHasRemoveFlag(channel string) bool {
key := buildChannelRemovePath(channel)
v, err := m.client.Load(key)
if err != nil || v != removeFlagTomestone {
return false
}
return true
}
// ListSegmentFiles lists all segments' logs
func (m *meta) ListSegmentFiles() []string {
m.RLock()

View File

@ -602,3 +602,5 @@ func (h *mockHandler) GetVChanPositions(channel string, collectionID UniqueID, p
func (h *mockHandler) CheckShouldDropChannel(channel string) bool {
return false
}
func (h *mockHandler) FinishDropChannel(channel string) {}

View File

@ -891,35 +891,35 @@ func TestSaveBinlogPaths(t *testing.T) {
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetErrorCode())
assert.Equal(t, serverNotServingErrMsg, resp.GetReason())
})
/*
t.Run("test save dropped segment and remove channel", func(t *testing.T) {
spyCh := make(chan struct{}, 1)
svr := newTestServer(t, nil, SetSegmentManager(&spySegmentManager{spyCh: spyCh}))
defer closeTestServer(t, svr)
t.Run("test save dropped segment and remove channel", func(t *testing.T) {
spyCh := make(chan struct{}, 1)
svr := newTestServer(t, nil, SetSegmentManager(&spySegmentManager{spyCh: spyCh}))
defer closeTestServer(t, svr)
svr.meta.AddCollection(&datapb.CollectionInfo{ID: 1})
err := svr.meta.AddSegment(&SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 1,
InsertChannel: "ch1",
State: commonpb.SegmentState_Growing,
},
})
assert.Nil(t, err)
svr.meta.AddCollection(&datapb.CollectionInfo{ID: 1})
err := svr.meta.AddSegment(&SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 1,
InsertChannel: "ch1",
State: commonpb.SegmentState_Growing,
},
})
assert.Nil(t, err)
err = svr.channelManager.AddNode(0)
assert.Nil(t, err)
err = svr.channelManager.Watch(&channel{"ch1", 1})
assert.Nil(t, err)
err = svr.channelManager.AddNode(0)
assert.Nil(t, err)
err = svr.channelManager.Watch(&channel{"ch1", 1})
assert.Nil(t, err)
_, err = svr.SaveBinlogPaths(context.TODO(), &datapb.SaveBinlogPathsRequest{
SegmentID: 1,
Dropped: true,
})
assert.Nil(t, err)
<-spyCh
})
_, err = svr.SaveBinlogPaths(context.TODO(), &datapb.SaveBinlogPathsRequest{
SegmentID: 1,
Dropped: true,
})
assert.Nil(t, err)
<-spyCh
})*/
}
func TestDropVirtualChannel(t *testing.T) {
@ -1410,111 +1410,127 @@ func TestShouldDropChannel(t *testing.T) {
},
},
})
/*
s1 := &datapb.SegmentInfo{
ID: 1,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
StartPosition: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{8, 9, 10},
MsgGroup: "",
Timestamp: 0,
},
DmlPosition: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 0,
},
}
s2 := &datapb.SegmentInfo{
ID: 2,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
CompactionFrom: []int64{4, 5},
StartPosition: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{8, 9, 10},
MsgGroup: "",
},
DmlPosition: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
}
s3 := &datapb.SegmentInfo{
ID: 3,
CollectionID: 0,
PartitionID: 1,
InsertChannel: "ch1",
State: commonpb.SegmentState_Growing,
StartPosition: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{8, 9, 10},
MsgGroup: "",
},
DmlPosition: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{11, 12, 13},
MsgGroup: "",
Timestamp: 2,
},
}
s4 := &datapb.SegmentInfo{
ID: 4,
CollectionID: 0,
PartitionID: 1,
InsertChannel: "ch1",
State: commonpb.SegmentState_Growing,
}*/
/*
t.Run("channel without segments", func(t *testing.T) {
r := svr.handler.CheckShouldDropChannel("ch1")
assert.True(t, r)
s1 := &datapb.SegmentInfo{
ID: 1,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
StartPosition: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{8, 9, 10},
MsgGroup: "",
Timestamp: 0,
},
DmlPosition: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 0,
},
}
s2 := &datapb.SegmentInfo{
ID: 2,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
CompactionFrom: []int64{4, 5},
StartPosition: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{8, 9, 10},
MsgGroup: "",
},
DmlPosition: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
}
s3 := &datapb.SegmentInfo{
ID: 3,
CollectionID: 0,
PartitionID: 1,
InsertChannel: "ch1",
State: commonpb.SegmentState_Growing,
StartPosition: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{8, 9, 10},
MsgGroup: "",
},
DmlPosition: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{11, 12, 13},
MsgGroup: "",
Timestamp: 2,
},
}
s4 := &datapb.SegmentInfo{
ID: 4,
CollectionID: 0,
PartitionID: 1,
InsertChannel: "ch1",
State: commonpb.SegmentState_Growing,
}
})
t.Run("channel without segments", func(t *testing.T) {
r := svr.handler.CheckShouldDropChannel("ch1")
assert.True(t, r)
t.Run("channel with all dropped segments", func(t *testing.T) {
err := svr.meta.AddSegment(NewSegmentInfo(s1))
require.NoError(t, err)
r := svr.handler.CheckShouldDropChannel("ch1")
assert.True(t, r)
})
t.Run("channel with all dropped segments and flushed compacted segments", func(t *testing.T) {
err := svr.meta.AddSegment(NewSegmentInfo(s2))
require.Nil(t, err)
r := svr.handler.CheckShouldDropChannel("ch1")
assert.False(t, r)
})
t.Run("channel with other state segments", func(t *testing.T) {
err := svr.meta.DropSegment(2)
require.Nil(t, err)
err = svr.meta.AddSegment(NewSegmentInfo(s3))
require.Nil(t, err)
r := svr.handler.CheckShouldDropChannel("ch1")
assert.False(t, r)
})
t.Run("channel with dropped segment and with segment without start position", func(t *testing.T) {
err := svr.meta.DropSegment(3)
require.Nil(t, err)
err = svr.meta.AddSegment(NewSegmentInfo(s4))
require.Nil(t, err)
r := svr.handler.CheckShouldDropChannel("ch1")
assert.True(t, r)
})
*/
t.Run("channel name not in kv", func(t *testing.T) {
assert.False(t, svr.handler.CheckShouldDropChannel("ch99"))
})
t.Run("channel with all dropped segments", func(t *testing.T) {
err := svr.meta.AddSegment(NewSegmentInfo(s1))
t.Run("channel in remove flag", func(t *testing.T) {
key := buildChannelRemovePath("ch1")
err := svr.meta.client.Save(key, removeFlagTomestone)
require.NoError(t, err)
r := svr.handler.CheckShouldDropChannel("ch1")
assert.True(t, r)
assert.True(t, svr.handler.CheckShouldDropChannel("ch1"))
})
t.Run("channel with all dropped segments and flushed compacted segments", func(t *testing.T) {
err := svr.meta.AddSegment(NewSegmentInfo(s2))
require.Nil(t, err)
r := svr.handler.CheckShouldDropChannel("ch1")
assert.False(t, r)
})
t.Run("channel with other state segments", func(t *testing.T) {
err := svr.meta.DropSegment(2)
require.Nil(t, err)
err = svr.meta.AddSegment(NewSegmentInfo(s3))
require.Nil(t, err)
r := svr.handler.CheckShouldDropChannel("ch1")
assert.False(t, r)
})
t.Run("channel with dropped segment and with segment without start position", func(t *testing.T) {
err := svr.meta.DropSegment(3)
require.Nil(t, err)
err = svr.meta.AddSegment(NewSegmentInfo(s4))
require.Nil(t, err)
r := svr.handler.CheckShouldDropChannel("ch1")
assert.True(t, r)
t.Run("channel name not matched", func(t *testing.T) {
assert.False(t, svr.handler.CheckShouldDropChannel("ch2"))
})
}
@ -1683,7 +1699,6 @@ func TestGetRecoveryInfo(t *testing.T) {
assert.EqualValues(t, 1, resp.GetBinlogs()[0].GetFieldBinlogs()[0].GetFieldID())
assert.ElementsMatch(t, []string{"/binlog/file1", "/binlog/file2"}, resp.GetBinlogs()[0].GetFieldBinlogs()[0].GetBinlogs())
})
t.Run("with dropped segments", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)

View File

@ -353,14 +353,16 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
log.Debug("flush segment with meta", zap.Int64("id", req.SegmentID),
zap.Any("meta", req.GetField2BinlogPaths()))
if req.GetDropped() && s.handler.CheckShouldDropChannel(channel) {
log.Debug("remove channel", zap.String("channel", channel))
err = s.channelManager.RemoveChannel(channel)
if err != nil {
log.Warn("failed to remove channel", zap.String("channel", channel), zap.Error(err))
}
s.segmentManager.DropSegmentsOfChannel(ctx, channel)
}
// Drop logic handler in DropVirtualChannel
/*
if req.GetDropped() && s.handler.CheckShouldDropChannel(channel) {
log.Debug("remove channel", zap.String("channel", channel))
err = s.channelManager.RemoveChannel(channel)
if err != nil {
log.Warn("failed to remove channel", zap.String("channel", channel), zap.Error(err))
}
s.segmentManager.DropSegmentsOfChannel(ctx, channel)
}*/
if req.GetFlushed() {
s.segmentManager.DropSegment(ctx, req.SegmentID)

View File

@ -35,6 +35,7 @@ type compactionExecutor struct {
parallelCh chan struct{}
executing sync.Map // planID to compactor
taskCh chan compactor
dropped sync.Map // vchannel dropped
}
// 0.5*min(8, NumCPU/2)
@ -98,11 +99,19 @@ func (c *compactionExecutor) stopTask(planID UniqueID) {
}
}
func (c *compactionExecutor) channelValidateForCompaction(vChannelName string) bool {
// if vchannel marked dropped, compaction should not proceed
_, loaded := c.dropped.Load(vChannelName)
return !loaded
}
func (c *compactionExecutor) stopExecutingtaskByVChannelName(vChannelName string) {
c.dropped.Store(vChannelName, struct{}{})
c.executing.Range(func(key interface{}, value interface{}) bool {
if value.(*compactionTask).plan.GetChannel() == vChannelName {
if value.(compactor).getChannelName() == vChannelName {
c.stopTask(key.(UniqueID))
}
log.Warn(value.(compactor).getChannelName())
return true
})
}

View File

@ -18,7 +18,10 @@ package datanode
import (
"context"
"sync"
"testing"
"github.com/stretchr/testify/assert"
)
func TestCompactionExecutor(t *testing.T) {
@ -64,24 +67,83 @@ func TestCompactionExecutor(t *testing.T) {
}
})
t.Run("Test channel valid check", func(t *testing.T) {
tests := []struct {
expected bool
channel string
desc string
}{
{expected: true, channel: "ch1", desc: "no in dropped"},
{expected: false, channel: "ch2", desc: "in dropped"},
}
ex := newCompactionExecutor()
ex.stopExecutingtaskByVChannelName("ch2")
for _, test := range tests {
t.Run(test.desc, func(t *testing.T) {
assert.Equal(t, test.expected, ex.channelValidateForCompaction(test.channel))
})
}
})
t.Run("test stop vchannel tasks", func(t *testing.T) {
ex := newCompactionExecutor()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go ex.start(ctx)
mc := newMockCompactor(true)
mc.alwaysWorking = true
ex.execute(mc)
// wait for task enqueued
found := false
for !found {
ex.executing.Range(func(key, value interface{}) bool {
found = true
return true
})
}
ex.stopExecutingtaskByVChannelName("mock")
select {
case <-mc.ctx.Done():
default:
t.FailNow()
}
})
}
func newMockCompactor(isvalid bool) *mockCompactor {
return &mockCompactor{isvalid: isvalid}
ctx, cancel := context.WithCancel(context.TODO())
return &mockCompactor{
ctx: ctx,
cancel: cancel,
isvalid: isvalid,
}
}
type mockCompactor struct {
ctx context.Context
cancel context.CancelFunc
isvalid bool
sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
isvalid bool
alwaysWorking bool
}
var _ compactor = (*mockCompactor)(nil)
func (mc *mockCompactor) compact() error {
mc.Add(1)
defer mc.Done()
if !mc.isvalid {
return errStart
}
if mc.alwaysWorking {
<-mc.ctx.Done()
return mc.ctx.Err()
}
return nil
}
@ -92,9 +154,14 @@ func (mc *mockCompactor) getPlanID() UniqueID {
func (mc *mockCompactor) stop() {
if mc.cancel != nil {
mc.cancel()
mc.Wait()
}
}
func (mc *mockCompactor) getCollection() UniqueID {
return 1
}
func (mc *mockCompactor) getChannelName() string {
return "mock"
}

View File

@ -52,6 +52,7 @@ type compactor interface {
stop()
getPlanID() UniqueID
getCollection() UniqueID
getChannelName() string
}
// make sure compactionTask implements compactor interface
@ -70,6 +71,8 @@ type compactionTask struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// check if compactionTask implements compactor
@ -102,12 +105,17 @@ func newCompactionTask(
func (t *compactionTask) stop() {
t.cancel()
t.wg.Wait()
}
func (t *compactionTask) getPlanID() UniqueID {
return t.plan.GetPlanID()
}
func (t *compactionTask) getChannelName() string {
return t.plan.GetChannel()
}
func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelTs Timestamp) (map[UniqueID]Timestamp, *DelDataBuf, error) {
dCodec := storage.NewDeleteCodec()
@ -257,6 +265,8 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp,
}
func (t *compactionTask) compact() error {
t.wg.Add(1)
defer t.wg.Done()
ctxTimeout, cancelAll := context.WithTimeout(t.ctx, time.Duration(t.plan.GetTimeoutInSeconds())*time.Second)
defer cancelAll()

View File

@ -274,7 +274,9 @@ func (node *DataNode) handleChannelEvt(evt *clientv3.Event) {
case clientv3.EventTypeDelete:
// guaranteed there is no "/" in channel name
parts := strings.Split(string(evt.Kv.Key), "/")
node.ReleaseDataSyncService(parts[len(parts)-1])
vchanName := parts[len(parts)-1]
log.Warn("handle channel delete event", zap.Int64("node id", Params.NodeID), zap.String("vchannel", vchanName))
node.ReleaseDataSyncService(vchanName)
}
}
@ -333,7 +335,7 @@ func (node *DataNode) NewDataSyncService(vchan *datapb.VchannelInfo) error {
flushCh := make(chan flushMsg, 100)
dataSyncService, err := newDataSyncService(node.ctx, flushCh, replica, alloc, node.msFactory, vchan, node.clearSignal, node.dataCoord, node.segmentCache, node.blobKv)
dataSyncService, err := newDataSyncService(node.ctx, flushCh, replica, alloc, node.msFactory, vchan, node.clearSignal, node.dataCoord, node.segmentCache, node.blobKv, node.compactionExecutor)
if err != nil {
return err
}
@ -357,7 +359,6 @@ func (node *DataNode) BackGroundGC(vChannelCh <-chan string) {
select {
case vChan := <-vChannelCh:
log.Info("GC flowgraph", zap.String("vChan", vChan))
node.stopCompactionOfVChannel(vChan)
node.ReleaseDataSyncService(vChan)
case <-node.ctx.Done():
log.Info("DataNode ctx done")
@ -741,12 +742,6 @@ func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe
}, nil
}
func (node *DataNode) stopCompactionOfVChannel(vChan string) {
log.Debug("Stop compaction of vChannel", zap.String("vChannelName", vChan))
node.compactionExecutor.stopExecutingtaskByVChannelName(vChan)
}
func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan) (*commonpb.Status, error) {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
@ -759,6 +754,12 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan
return status, nil
}
if !node.compactionExecutor.channelValidateForCompaction(req.GetChannel()) {
log.Warn("channel of compaction is marked invalid in compaction executor", zap.String("channel name", req.GetChannel()))
status.Reason = "channel marked invalid"
return status, nil
}
binlogIO := &binlogIO{node.blobKv, ds.idAllocator}
task := newCompactionTask(
node.ctx,

View File

@ -47,6 +47,7 @@ type dataSyncService struct {
flushingSegCache *Cache // a guarding cache stores currently flushing segment ids
flushManager flushManager // flush manager handles flush process
blobKV kv.BaseKV
compactor *compactionExecutor // reference to compaction executor
}
func newDataSyncService(ctx context.Context,
@ -59,7 +60,7 @@ func newDataSyncService(ctx context.Context,
dataCoord types.DataCoord,
flushingSegCache *Cache,
blobKV kv.BaseKV,
compactor *compactionExecutor,
) (*dataSyncService, error) {
if replica == nil {
@ -82,6 +83,7 @@ func newDataSyncService(ctx context.Context,
clearSignal: clearSignal,
flushingSegCache: flushingSegCache,
blobKV: blobKV,
compactor: compactor,
}
if err := service.initNodes(vchan); err != nil {
@ -212,7 +214,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
return err
}
var ddNode Node = newDDNode(dsService.ctx, dsService.collectionID, vchanInfo, dsService.msFactory)
var ddNode Node = newDDNode(dsService.ctx, dsService.collectionID, vchanInfo, dsService.msFactory, dsService.compactor)
var insertBufferNode Node
insertBufferNode, err = newInsertBufferNode(
dsService.ctx,

View File

@ -148,6 +148,7 @@ func TestDataSyncService_newDataSyncService(te *testing.T) {
df,
newCache(),
memkv.NewMemoryKV(),
newCompactionExecutor(),
)
if !test.isValidCase {
@ -224,7 +225,7 @@ func TestDataSyncService_Start(t *testing.T) {
}
signalCh := make(chan string, 100)
sync, err := newDataSyncService(ctx, flushChan, replica, allocFactory, msFactory, vchan, signalCh, &DataCoordFactory{}, newCache(), memkv.NewMemoryKV())
sync, err := newDataSyncService(ctx, flushChan, replica, allocFactory, msFactory, vchan, signalCh, &DataCoordFactory{}, newCache(), memkv.NewMemoryKV(), newCompactionExecutor())
assert.Nil(t, err)
// sync.replica.addCollection(collMeta.ID, collMeta.Schema)

View File

@ -61,8 +61,9 @@ type ddNode struct {
droppedSegments []*datapb.SegmentInfo
vchannelName string
deltaMsgStream msgstream.MsgStream
dropMode atomic.Value
deltaMsgStream msgstream.MsgStream
dropMode atomic.Value
compactionExecutor *compactionExecutor
}
// Name returns node name, implementing flowgraph.Node
@ -110,7 +111,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
dropCollection: false,
}
forwardMsgs := make([]msgstream.TsMsg, 0)
var forwardMsgs []msgstream.TsMsg
for _, msg := range msMsg.TsMessages() {
switch msg.Type() {
case commonpb.MsgType_DropCollection:
@ -119,6 +120,9 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
zap.Any("collectionID", ddn.collectionID),
zap.String("vChannelName", ddn.vchannelName))
ddn.dropMode.Store(true)
log.Debug("Stop compaction of vChannel", zap.String("vChannelName", ddn.vchannelName))
ddn.compactionExecutor.stopExecutingtaskByVChannelName(ddn.vchannelName)
fgMsg.dropCollection = true
}
case commonpb.MsgType_Insert:
@ -257,7 +261,8 @@ func (ddn *ddNode) Close() {
}
}
func newDDNode(ctx context.Context, collID UniqueID, vchanInfo *datapb.VchannelInfo, msFactory msgstream.Factory) *ddNode {
func newDDNode(ctx context.Context, collID UniqueID, vchanInfo *datapb.VchannelInfo,
msFactory msgstream.Factory, compactor *compactionExecutor) *ddNode {
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(Params.FlowGraphMaxQueueLength)
baseNode.SetMaxParallelism(Params.FlowGraphMaxParallelism)
@ -288,12 +293,13 @@ func newDDNode(ctx context.Context, collID UniqueID, vchanInfo *datapb.VchannelI
deltaMsgStream.Start()
dd := &ddNode{
BaseNode: baseNode,
collectionID: collID,
flushedSegments: fs,
droppedSegments: vchanInfo.GetDroppedSegments(),
vchannelName: vchanInfo.ChannelName,
deltaMsgStream: deltaMsgStream,
BaseNode: baseNode,
collectionID: collID,
flushedSegments: fs,
droppedSegments: vchanInfo.GetDroppedSegments(),
vchannelName: vchanInfo.ChannelName,
deltaMsgStream: deltaMsgStream,
compactionExecutor: compactor,
}
dd.dropMode.Store(false)

View File

@ -82,6 +82,7 @@ func TestFlowGraph_DDNode_newDDNode(te *testing.T) {
ChannelName: "by-dev-rootcoord-dml-test",
},
mmf,
newCompactionExecutor(),
)
require.NotNil(t, ddNode)
@ -146,8 +147,10 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) {
deltaStream, err := factory.NewMsgStream(context.Background())
assert.Nil(t, err)
ddn := ddNode{
collectionID: test.ddnCollID,
deltaMsgStream: deltaStream,
collectionID: test.ddnCollID,
deltaMsgStream: deltaStream,
vchannelName: "ddn_drop_msg",
compactionExecutor: newCompactionExecutor(),
}
var dropCollMsg msgstream.TsMsg = &msgstream.DropCollectionMsg{

View File

@ -233,6 +233,7 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
}
if fgMsg.dropCollection {
dn.flushManager.notifyAllFlushed()
log.Debug("DeleteNode notifies BackgroundGC to release vchannel", zap.String("vChannelName", dn.channelName))
dn.clearSignal <- dn.channelName
}

View File

@ -282,4 +282,39 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
// send again shall trigger empty buffer flush
delNode.Operate([]flowgraph.Msg{fgMsg})
})
t.Run("Test deleteNode Operate valid with dropCollection", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
chanName := "datanode-test-FlowGraphDeletenode-operate"
testPath := "/test/datanode/root/meta"
assert.NoError(t, clearEtcd(testPath))
Params.MetaRootPath = testPath
Params.DeleteBinlogRootPath = testPath
c := &nodeConfig{
replica: replica,
allocator: NewAllocatorFactory(),
vChannelName: chanName,
}
sig := make(chan string, 1)
delNode, err := newDeleteNode(ctx, fm, sig, c)
assert.Nil(t, err)
msg := genFlowGraphDeleteMsg(pks, chanName)
msg.segmentsToFlush = segIDs
msg.endPositions[0].Timestamp = 100 // set to normal timestamp
msg.dropCollection = true
assert.NotPanics(t, func() {
fm.startDropping()
delNode.Operate([]flowgraph.Msg{&msg})
})
timer := time.NewTimer(time.Millisecond)
select {
case <-timer.C:
t.FailNow()
case <-sig:
}
})
}

View File

@ -179,6 +179,10 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
return []Msg{}
}
if fgMsg.dropCollection {
ibNode.flushManager.startDropping()
}
var spans []opentracing.Span
for _, msg := range fgMsg.insertMessages {
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())

View File

@ -204,8 +204,12 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
}
inMsg := genFlowGraphInsertMsg(insertChannelName)
var fgMsg flowgraph.Msg = &inMsg
iBNode.Operate([]flowgraph.Msg{fgMsg})
assert.NotPanics(t, func() { iBNode.Operate([]flowgraph.Msg{&inMsg}) })
// test drop collection operate
inMsg = genFlowGraphInsertMsg(insertChannelName)
inMsg.dropCollection = true
assert.NotPanics(t, func() { iBNode.Operate([]flowgraph.Msg{&inMsg}) })
}
/*

View File

@ -496,6 +496,16 @@ func (m *rendezvousFlushManager) startDropping() {
m.dropHandler.dropFlushWg.Wait() // waits for all drop mode task done
m.dropHandler.Lock()
defer m.dropHandler.Unlock()
// apply injection if any
for _, pack := range m.dropHandler.packs {
q := m.getFlushQueue(pack.segmentID)
// queue will never be nil, sincde getFlushQueue will initialize one if not found
q.injectMut.Lock()
if q.postInjection != nil {
q.postInjection(pack)
}
q.injectMut.Unlock()
}
m.dropHandler.flushAndDrop(m.dropHandler.packs) // invoke drop & flush
}()
}

View File

@ -364,56 +364,115 @@ func TestRendezvousFlushManager_waitForAllFlushQueue(t *testing.T) {
}
func TestRendezvousFlushManager_dropMode(t *testing.T) {
kv := memkv.NewMemoryKV()
t.Run("test drop mode", func(t *testing.T) {
kv := memkv.NewMemoryKV()
var mut sync.Mutex
var result []*segmentFlushPack
signal := make(chan struct{})
var mut sync.Mutex
var result []*segmentFlushPack
signal := make(chan struct{})
m := NewRendezvousFlushManager(&allocator{}, kv, newMockReplica(), func(pack *segmentFlushPack) {
}, func(packs []*segmentFlushPack) {
m := NewRendezvousFlushManager(&allocator{}, kv, newMockReplica(), func(pack *segmentFlushPack) {
}, func(packs []*segmentFlushPack) {
mut.Lock()
result = packs
mut.Unlock()
close(signal)
})
halfMsgID := []byte{1, 1, 1}
m.flushBufferData(nil, -1, true, false, &internalpb.MsgPosition{
MsgID: halfMsgID,
})
m.startDropping()
// half normal, half drop mode, should not appear in final packs
m.flushDelData(nil, -1, &internalpb.MsgPosition{
MsgID: halfMsgID,
})
target := make(map[int64]struct{})
for i := 1; i < 11; i++ {
target[int64(i)] = struct{}{}
m.flushBufferData(nil, int64(i), true, false, &internalpb.MsgPosition{
MsgID: []byte{1},
})
m.flushDelData(nil, int64(i), &internalpb.MsgPosition{
MsgID: []byte{1},
})
}
m.notifyAllFlushed()
<-signal
mut.Lock()
result = packs
mut.Unlock()
close(signal)
})
defer mut.Unlock()
halfMsgID := []byte{1, 1, 1}
m.flushBufferData(nil, -1, true, false, &internalpb.MsgPosition{
MsgID: halfMsgID,
output := make(map[int64]struct{})
for _, pack := range result {
assert.NotEqual(t, -1, pack.segmentID)
output[pack.segmentID] = struct{}{}
_, has := target[pack.segmentID]
assert.True(t, has)
}
assert.Equal(t, len(target), len(output))
})
t.Run("test drop mode with injection", func(t *testing.T) {
kv := memkv.NewMemoryKV()
m.startDropping()
// half normal, half drop mode, should not appear in final packs
m.flushDelData(nil, -1, &internalpb.MsgPosition{
MsgID: halfMsgID,
})
var mut sync.Mutex
var result []*segmentFlushPack
signal := make(chan struct{})
target := make(map[int64]struct{})
for i := 1; i < 11; i++ {
target[int64(i)] = struct{}{}
m.flushBufferData(nil, int64(i), true, false, &internalpb.MsgPosition{
MsgID: []byte{1},
m := NewRendezvousFlushManager(&allocator{}, kv, newMockReplica(), func(pack *segmentFlushPack) {
}, func(packs []*segmentFlushPack) {
mut.Lock()
result = packs
mut.Unlock()
close(signal)
})
m.flushDelData(nil, int64(i), &internalpb.MsgPosition{
MsgID: []byte{1},
halfMsgID := []byte{1, 1, 1}
m.flushBufferData(nil, -1, true, false, &internalpb.MsgPosition{
MsgID: halfMsgID,
})
}
m.notifyAllFlushed()
injFunc := func(pack *segmentFlushPack) {
pack.segmentID = 100
}
for i := 1; i < 11; i++ {
it := newTaskInjection(1, injFunc)
m.injectFlush(it, int64(i))
<-it.Injected()
it.injectDone(true)
}
<-signal
mut.Lock()
defer mut.Unlock()
m.startDropping()
// half normal, half drop mode, should not appear in final packs
m.flushDelData(nil, -1, &internalpb.MsgPosition{
MsgID: halfMsgID,
})
for i := 1; i < 11; i++ {
m.flushBufferData(nil, int64(i), true, false, &internalpb.MsgPosition{
MsgID: []byte{1},
})
m.flushDelData(nil, int64(i), &internalpb.MsgPosition{
MsgID: []byte{1},
})
}
m.notifyAllFlushed()
<-signal
mut.Lock()
defer mut.Unlock()
for _, pack := range result {
assert.NotEqual(t, -1, pack.segmentID)
assert.Equal(t, int64(100), pack.segmentID)
}
})
output := make(map[int64]struct{})
for _, pack := range result {
assert.NotEqual(t, -1, pack.segmentID)
output[pack.segmentID] = struct{}{}
_, has := target[pack.segmentID]
assert.True(t, has)
}
assert.Equal(t, len(target), len(output))
}
func TestRendezvousFlushManager_close(t *testing.T) {