Fix DN GC bug (#12258)

Fixes: #12249

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/12269/head
XuanYang-cn 2021-11-25 09:43:15 +08:00 committed by GitHub
parent f1904ad3cc
commit 8a8ebed23d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 83 additions and 57 deletions

View File

@ -98,12 +98,11 @@ func (c *compactionExecutor) stopTask(planID UniqueID) {
}
}
func (c *compactionExecutor) stopExecutingtaskByCollectionID(collID UniqueID) {
func (c *compactionExecutor) stopExecutingtaskByVChannelName(vChannelName string) {
c.executing.Range(func(key interface{}, value interface{}) bool {
if value.(compactor).getCollection() == collID {
if value.(*compactionTask).plan.GetChannel() == vChannelName {
c.stopTask(key.(UniqueID))
}
return true
})
}

View File

@ -106,7 +106,7 @@ type DataNode struct {
vchan2SyncService map[string]*dataSyncService // vchannel name
vchan2FlushChs map[string]chan flushMsg // vchannel name to flush channels
clearSignal chan UniqueID // collection ID
clearSignal chan string // vchannel name
segmentCache *Cache
compactionExecutor *compactionExecutor
@ -139,7 +139,7 @@ func NewDataNode(ctx context.Context, factory msgstream.Factory) *DataNode {
vchan2SyncService: make(map[string]*dataSyncService),
vchan2FlushChs: make(map[string]chan flushMsg),
clearSignal: make(chan UniqueID, 100),
clearSignal: make(chan string, 100),
}
node.UpdateStateCode(internalpb.StateCode_Abnormal)
return node
@ -351,16 +351,14 @@ func (node *DataNode) NewDataSyncService(vchan *datapb.VchannelInfo) error {
}
// BackGroundGC runs in background to release datanode resources
func (node *DataNode) BackGroundGC(collIDCh <-chan UniqueID) {
func (node *DataNode) BackGroundGC(vChannelCh <-chan string) {
log.Info("DataNode Background GC Start")
for {
select {
case collID := <-collIDCh:
log.Info("GC collection", zap.Int64("ID", collID))
node.stopCompactionOfCollection(collID)
for _, vchanName := range node.getChannelNamesbyCollectionID(collID) {
node.ReleaseDataSyncService(vchanName)
}
case vChan := <-vChannelCh:
log.Info("GC flowgraph", zap.String("vChan", vChan))
node.stopCompactionOfVChannel(vChan)
node.ReleaseDataSyncService(vChan)
case <-node.ctx.Done():
log.Info("DataNode ctx done")
return
@ -739,10 +737,10 @@ func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe
}, nil
}
func (node *DataNode) stopCompactionOfCollection(collID UniqueID) {
log.Debug("Stop compaction of collection", zap.Int64("collection ID", collID))
func (node *DataNode) stopCompactionOfVChannel(vChan string) {
log.Debug("Stop compaction of vChannel", zap.String("vChannelName", vChan))
node.compactionExecutor.stopExecutingtaskByCollectionID(collID)
node.compactionExecutor.stopExecutingtaskByVChannelName(vChan)
}
func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan) (*commonpb.Status, error) {

View File

@ -358,32 +358,33 @@ func TestDataNode(t *testing.T) {
zap.String("response", resp.Response))
})
t.Run("Test BackGroundGC", func(te *testing.T) {
t.Run("Test BackGroundGC", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
node := newIDLEDataNodeMock(ctx)
collIDCh := make(chan UniqueID)
node.clearSignal = collIDCh
go node.BackGroundGC(collIDCh)
vchanNameCh := make(chan string)
node.clearSignal = vchanNameCh
go node.BackGroundGC(vchanNameCh)
testDataSyncs := []struct {
collID UniqueID
dmChannelName string
}{
{1, "fake-by-dev-rootcoord-dml-backgroundgc-1"},
{2, "fake-by-dev-rootcoord-dml-backgroundgc-2"},
{3, "fake-by-dev-rootcoord-dml-backgroundgc-3"},
{4, ""},
{1, ""},
{"fake-by-dev-rootcoord-dml-backgroundgc-1"},
{"fake-by-dev-rootcoord-dml-backgroundgc-2"},
{"fake-by-dev-rootcoord-dml-backgroundgc-3"},
{""},
{""},
}
for i, t := range testDataSyncs {
for i, test := range testDataSyncs {
if i <= 2 {
err = node.NewDataSyncService(&datapb.VchannelInfo{CollectionID: t.collID, ChannelName: t.dmChannelName})
assert.Nil(te, err)
}
collIDCh <- t.collID
err = node.NewDataSyncService(&datapb.VchannelInfo{CollectionID: 1, ChannelName: test.dmChannelName})
assert.Nil(t, err)
vchanNameCh <- test.dmChannelName
}
}
assert.Eventually(t, func() bool {

View File

@ -39,9 +39,10 @@ type dataSyncService struct {
replica Replica // segment replica stores meta
idAllocator allocatorInterface // id/timestamp allocator
msFactory msgstream.Factory
collectionID UniqueID // collection id of vchan for which this data sync service serves
collectionID UniqueID // collection id of vchan for which this data sync service serves
vchannelName string
dataCoord types.DataCoord // DataCoord instance to interact with
clearSignal chan<- UniqueID // signal channel to notify flowgraph close for collection/partition drop msg consumed
clearSignal chan<- string // signal channel to notify flowgraph close for collection/partition drop msg consumed
flushingSegCache *Cache // a guarding cache stores currently flushing segment ids
flushManager flushManager // flush manager handles flush process
@ -54,7 +55,7 @@ func newDataSyncService(ctx context.Context,
alloc allocatorInterface,
factory msgstream.Factory,
vchan *datapb.VchannelInfo,
clearSignal chan<- UniqueID,
clearSignal chan<- string,
dataCoord types.DataCoord,
flushingSegCache *Cache,
blobKV kv.BaseKV,
@ -76,6 +77,7 @@ func newDataSyncService(ctx context.Context,
idAllocator: alloc,
msFactory: factory,
collectionID: vchan.GetCollectionID(),
vchannelName: vchan.GetChannelName(),
dataCoord: dataCoord,
clearSignal: clearSignal,
flushingSegCache: flushingSegCache,

View File

@ -144,7 +144,7 @@ func TestDataSyncService_newDataSyncService(te *testing.T) {
NewAllocatorFactory(),
test.inMsgFactory,
getVchanInfo(test),
make(chan UniqueID),
make(chan string),
df,
newCache(),
memkv.NewMemoryKV(),
@ -223,7 +223,7 @@ func TestDataSyncService_Start(t *testing.T) {
FlushedSegments: fs,
}
signalCh := make(chan UniqueID, 100)
signalCh := make(chan string, 100)
sync, err := newDataSyncService(ctx, flushChan, replica, allocFactory, msFactory, vchan, signalCh, &DataCoordFactory{}, newCache(), memkv.NewMemoryKV())
assert.Nil(t, err)

View File

@ -115,17 +115,18 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
switch msg.Type() {
case commonpb.MsgType_DropCollection:
if msg.(*msgstream.DropCollectionMsg).GetCollectionID() == ddn.collectionID {
log.Info("Receiving DropCollection msg", zap.Any("collectionID", ddn.collectionID))
log.Info("Receiving DropCollection msg",
zap.Any("collectionID", ddn.collectionID),
zap.String("vChannelName", ddn.vchannelName))
ddn.dropMode.Store(true)
fgMsg.dropCollection = true
}
case commonpb.MsgType_Insert:
log.Debug("DDNode receive insert messages")
imsg := msg.(*msgstream.InsertMsg)
if imsg.CollectionID != ddn.collectionID {
//log.Debug("filter invalid InsertMsg, collection mis-match",
// zap.Int64("Get msg collID", imsg.CollectionID),
// zap.Int64("Expected collID", ddn.collectionID))
log.Warn("filter invalid InsertMsg, collection mis-match",
zap.Int64("Get collID", imsg.CollectionID),
zap.Int64("Expected collID", ddn.collectionID))
continue
}
if msg.EndTs() < FilterThreshold {
@ -137,6 +138,9 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
continue
}
}
log.Debug("DDNode receive insert messages",
zap.Int("numRows", len(imsg.GetRowIDs())),
zap.String("vChannelName", ddn.vchannelName))
fgMsg.insertMessages = append(fgMsg.insertMessages, imsg)
case commonpb.MsgType_Delete:
log.Debug("DDNode receive delete messages")
@ -146,9 +150,9 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
}
forwardMsgs = append(forwardMsgs, dmsg)
if dmsg.CollectionID != ddn.collectionID {
//log.Debug("filter invalid DeleteMsg, collection mis-match",
// zap.Int64("Get msg collID", dmsg.CollectionID),
// zap.Int64("Expected collID", ddn.collectionID))
log.Warn("filter invalid DeleteMsg, collection mis-match",
zap.Int64("Get collID", dmsg.CollectionID),
zap.Int64("Expected collID", ddn.collectionID))
continue
}
fgMsg.deleteMessages = append(fgMsg.deleteMessages, dmsg)

View File

@ -45,7 +45,7 @@ type deleteNode struct {
idAllocator allocatorInterface
flushManager flushManager
clearSignal chan<- UniqueID
clearSignal chan<- string
}
// DelDataBuf buffers insert data, monitoring buffer size and limit
@ -99,7 +99,9 @@ func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange) er
for i, pk := range msg.PrimaryKeys {
segIDs, ok := m[pk]
if !ok {
log.Warn("primary key not exist in all segments", zap.Int64("primary key", pk))
log.Warn("primary key not exist in all segments",
zap.Int64("primary key", pk),
zap.String("vChannelName", dn.channelName))
continue
}
for _, segID := range segIDs {
@ -112,7 +114,9 @@ func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange) er
rows := len(pks)
tss, ok := segIDToTsMap[segID]
if !ok || rows != len(tss) {
// TODO: what's the expected behavior after this Error?
log.Error("primary keys and timestamp's element num mis-match")
continue
}
var delDataBuf *DelDataBuf
@ -127,7 +131,11 @@ func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange) er
for i := 0; i < rows; i++ {
delData.Pks = append(delData.Pks, pks[i])
delData.Tss = append(delData.Tss, tss[i])
log.Debug("delete", zap.Int64("primary key", pks[i]), zap.Uint64("ts", tss[i]))
log.Debug("delete",
zap.Int64("primary key", pks[i]),
zap.Uint64("ts", tss[i]),
zap.Int64("segmentID", segID),
zap.String("vChannelName", dn.channelName))
}
// store
@ -145,13 +153,24 @@ func (dn *deleteNode) showDelBuf() {
segID := seg.segmentID
if v, ok := dn.delBuf.Load(segID); ok {
delDataBuf, _ := v.(*DelDataBuf)
log.Debug("del data buffer status", zap.Int64("segID", segID), zap.Int64("size", delDataBuf.size))
log.Debug("delta buffer status",
zap.Int64("segID", segID),
zap.Int64("size", delDataBuf.size),
zap.String("vchannel", dn.channelName))
// TODO control the printed length
length := len(delDataBuf.delData.Pks)
for i := 0; i < length; i++ {
log.Debug("del data", zap.Int64("pk", delDataBuf.delData.Pks[i]), zap.Uint64("ts", delDataBuf.delData.Tss[i]))
log.Debug("del data",
zap.Int64("pk", delDataBuf.delData.Pks[i]),
zap.Uint64("ts", delDataBuf.delData.Tss[i]),
zap.Int64("segmentID", segID),
zap.String("vchannel", dn.channelName),
)
}
} else {
log.Error("segment not exist", zap.Int64("segID", segID))
log.Error("segment not exist",
zap.Int64("segID", segID),
zap.String("vchannel", dn.channelName))
}
}
}
@ -193,7 +212,9 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
// handle flush
if len(fgMsg.segmentsToFlush) > 0 {
log.Debug("DeleteNode receives flush message", zap.Int64s("segIDs", fgMsg.segmentsToFlush))
log.Debug("DeleteNode receives flush message",
zap.Int64s("segIDs", fgMsg.segmentsToFlush),
zap.String("vChannelName", dn.channelName))
for _, segmentToFlush := range fgMsg.segmentsToFlush {
buf, ok := dn.delBuf.Load(segmentToFlush)
if !ok {
@ -212,8 +233,8 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
}
if fgMsg.dropCollection {
log.Debug("DeleteNode reveives dropCollection signal")
dn.clearSignal <- dn.replica.getCollectionID()
log.Debug("DeleteNode notifies BackgroundGC to release vchannel", zap.String("vChannelName", dn.channelName))
dn.clearSignal <- dn.channelName
}
for _, sp := range spans {
@ -241,7 +262,7 @@ func (dn *deleteNode) filterSegmentByPK(partID UniqueID, pks []int64) map[int64]
return result
}
func newDeleteNode(ctx context.Context, fm flushManager, sig chan<- UniqueID, config *nodeConfig) (*deleteNode, error) {
func newDeleteNode(ctx context.Context, fm flushManager, sig chan<- string, config *nodeConfig) (*deleteNode, error) {
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(config.maxQueueLength)
baseNode.SetMaxParallelism(config.maxParallelism)

View File

@ -107,7 +107,7 @@ func TestFlowGraphDeleteNode_newDeleteNode(te *testing.T) {
for _, test := range tests {
te.Run(test.description, func(t *testing.T) {
dn, err := newDeleteNode(test.ctx, nil, make(chan UniqueID, 1), test.config)
dn, err := newDeleteNode(test.ctx, nil, make(chan string, 1), test.config)
assert.Nil(t, err)
assert.NotNil(t, dn)
@ -213,7 +213,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
vChannelName: chanName,
}
dn, err := newDeleteNode(context.Background(), fm, make(chan UniqueID, 1), c)
dn, err := newDeleteNode(context.Background(), fm, make(chan string, 1), c)
assert.Nil(t, err)
results := dn.filterSegmentByPK(0, pks)
@ -244,7 +244,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
allocator: NewAllocatorFactory(),
vChannelName: chanName,
}
delNode, err := newDeleteNode(ctx, fm, make(chan UniqueID, 1), c)
delNode, err := newDeleteNode(ctx, fm, make(chan string, 1), c)
assert.Nil(te, err)
msg := genFlowGraphDeleteMsg(pks, chanName)
@ -268,7 +268,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
allocator: NewAllocatorFactory(),
vChannelName: chanName,
}
delNode, err := newDeleteNode(ctx, fm, make(chan UniqueID, 1), c)
delNode, err := newDeleteNode(ctx, fm, make(chan string, 1), c)
assert.Nil(te, err)
msg := genFlowGraphDeleteMsg(pks, chanName)

View File

@ -469,6 +469,7 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet
zap.Int("Length of Field2BinlogPaths", len(fieldInsert)),
zap.Int("Length of Field2Stats", len(fieldStats)),
zap.Int("Length of Field2Deltalogs", len(deltaInfos)),
zap.String("vChannelName", dsService.vchannelName),
)
req := &datapb.SaveBinlogPathsRequest{