Destroy DataNode when drop collections (#5638)

* Destroy DataNode when drop collections

Signed-off-by: yangxuan <xuan.yang@zilliz.com>

* golanci-lint

Signed-off-by: yefu.chen <yefu.chen@zilliz.com>

Co-authored-by: yefu.chen <yefu.chen@zilliz.com>
pull/5779/head
XuanYang-cn 2021-06-07 11:25:37 +08:00 committed by zhenshan.cao
parent f054fc9be2
commit ac19711d74
8 changed files with 152 additions and 30 deletions

View File

@ -2,6 +2,9 @@
update: 5.19.2021, by [Goose](https://github.com/XuanYang-cn)
update: 5.21.2021, by [Goose](https://github.com/XuanYang-cn)
update: 6.04.2021, by [Goose](https://github.com/XuanYang-cn)
**THIS IS OUTDATE**
## Background

2
go.mod
View File

@ -40,7 +40,7 @@ require (
github.com/yahoo/athenz v1.9.16 // indirect
go.etcd.io/etcd v3.3.25+incompatible
go.uber.org/zap v1.15.0
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a // indirect
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb
golang.org/x/text v0.3.3

View File

@ -64,8 +64,9 @@ type DataNode struct {
watchDm chan struct{}
chanMut sync.RWMutex
vchan2SyncService map[string]*dataSyncService
vchan2FlushCh map[string]chan<- *flushMsg
vchan2SyncService map[string]*dataSyncService // vchannel name
vchan2FlushCh map[string]chan<- *flushMsg // vchannel name
clearSignal chan UniqueID // collection ID
masterService types.MasterService
dataService types.DataService
@ -93,6 +94,7 @@ func NewDataNode(ctx context.Context, factory msgstream.Factory) *DataNode {
vchan2SyncService: make(map[string]*dataSyncService),
vchan2FlushCh: make(map[string]chan<- *flushMsg),
clearSignal: make(chan UniqueID, 100),
}
node.UpdateStateCode(internalpb.StateCode_Abnormal)
return node
@ -201,7 +203,7 @@ func (node *DataNode) NewDataSyncService(vchan *datapb.VchannelInfo) error {
metaService := newMetaService(node.ctx, replica, node.masterService)
flushChan := make(chan *flushMsg, 100)
dataSyncService := newDataSyncService(node.ctx, flushChan, replica, alloc, node.msFactory, vchan)
dataSyncService := newDataSyncService(node.ctx, flushChan, replica, alloc, node.msFactory, vchan, node.clearSignal)
// TODO metaService using timestamp in DescribeCollection
node.vchan2SyncService[vchan.GetChannelName()] = dataSyncService
node.vchan2FlushCh[vchan.GetChannelName()] = flushChan
@ -212,8 +214,44 @@ func (node *DataNode) NewDataSyncService(vchan *datapb.VchannelInfo) error {
return nil
}
// BackGroundGC runs in background to release datanode resources
func (node *DataNode) BackGroundGC(collIDCh <-chan UniqueID) {
log.Info("DataNode Background GC Start")
for {
select {
case collID := <-collIDCh:
log.Info("GC collection", zap.Int64("ID", collID))
for _, vchanName := range node.getChannelNamesbyCollectionID(collID) {
node.ReleaseDataSyncService(vchanName)
}
case <-node.ctx.Done():
return
}
}
}
// ReleaseDataSyncService release flowgraph resources for a vchanName
func (node *DataNode) ReleaseDataSyncService(vchanName string) {
log.Info("Release flowgraph resources begin", zap.String("Vchannel", vchanName))
node.chanMut.Lock()
if dss, ok := node.vchan2SyncService[vchanName]; ok {
dss.close()
}
delete(node.vchan2SyncService, vchanName)
node.chanMut.Unlock()
node.chanMut.Lock()
delete(node.vchan2FlushCh, vchanName)
node.chanMut.Unlock()
log.Debug("Release flowgraph resources end", zap.String("Vchannel", vchanName))
}
// Start will update DataNode state to HEALTHY
func (node *DataNode) Start() error {
go node.BackGroundGC(node.clearSignal)
node.UpdateStateCode(internalpb.StateCode_Healthy)
return nil
}
@ -263,7 +301,7 @@ func (node *DataNode) GetComponentStates(ctx context.Context) (*internalpb.Compo
return states, nil
}
func (node *DataNode) getChannelName(segID UniqueID) string {
func (node *DataNode) getChannelNamebySegmentID(segID UniqueID) string {
node.chanMut.RLock()
defer node.chanMut.RUnlock()
for name, dataSync := range node.vchan2SyncService {
@ -274,6 +312,19 @@ func (node *DataNode) getChannelName(segID UniqueID) string {
return ""
}
func (node *DataNode) getChannelNamesbyCollectionID(collID UniqueID) []string {
node.chanMut.RLock()
defer node.chanMut.RUnlock()
channels := make([]string, 0, len(node.vchan2SyncService))
for name, dataSync := range node.vchan2SyncService {
if dataSync.collectionID == collID {
channels = append(channels, name)
}
}
return channels
}
// ReadyToFlush tells wether DataNode is ready for flushing
func (node *DataNode) ReadyToFlush() error {
if node.State.Load().(internalpb.StateCode) != internalpb.StateCode_Healthy {
@ -328,7 +379,7 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
log.Debug("FlushSegments ...", zap.Int("num", len(req.SegmentIDs)))
dmlFlushedCh := make(chan []*datapb.ID2PathList, len(req.SegmentIDs))
for _, id := range req.SegmentIDs {
chanName := node.getChannelName(id)
chanName := node.getChannelNamebySegmentID(id)
log.Info("vchannel", zap.String("name", chanName))
if len(chanName) == 0 {
status.Reason = fmt.Sprintf("DataNode not find segment %d!", id)

View File

@ -15,6 +15,7 @@ import (
"math"
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -116,9 +117,8 @@ func TestDataNode(t *testing.T) {
t.Run("Test FlushSegments", func(t *testing.T) {
dmChannelName := "fake-dm-channel-test-HEALTHDataNodeMock"
ddChannelName := "fake-dd-channel-test-HEALTHDataNodeMock"
node1 := newHEALTHDataNodeMock(dmChannelName, ddChannelName)
node1 := newHEALTHDataNodeMock(dmChannelName)
sync, ok := node1.vchan2SyncService[dmChannelName]
assert.True(t, ok)
@ -166,24 +166,14 @@ func TestDataNode(t *testing.T) {
insertStream, _ := msFactory.NewMsgStream(node1.ctx)
insertStream.AsProducer([]string{dmChannelName})
ddStream, _ := msFactory.NewMsgStream(node1.ctx)
ddStream.AsProducer([]string{ddChannelName})
var insertMsgStream msgstream.MsgStream = insertStream
insertMsgStream.Start()
var ddMsgStream msgstream.MsgStream = ddStream
ddMsgStream.Start()
err = insertMsgStream.Broadcast(&timeTickMsgPack)
assert.NoError(t, err)
err = ddMsgStream.Broadcast(&timeTickMsgPack)
assert.NoError(t, err)
err = insertMsgStream.Broadcast(&timeTickMsgPack)
assert.NoError(t, err)
err = ddMsgStream.Broadcast(&timeTickMsgPack)
assert.NoError(t, err)
_, err = sync.replica.getSegmentByID(0)
assert.NoError(t, err)
@ -204,6 +194,64 @@ func TestDataNode(t *testing.T) {
assert.NoError(t, err)
})
t.Run("Test ReleaseDataSyncService", func(t *testing.T) {
dmChannelName := "fake-dm-channel-test-NewDataSyncService"
vchan := &datapb.VchannelInfo{
CollectionID: 1,
ChannelName: dmChannelName,
CheckPoints: []*datapb.CheckPoint{},
}
err := node.NewDataSyncService(vchan)
assert.NoError(t, err)
assert.Equal(t, 1, len(node.vchan2FlushCh))
assert.Equal(t, 1, len(node.vchan2SyncService))
time.Sleep(time.Second)
node.ReleaseDataSyncService(dmChannelName)
assert.Equal(t, 0, len(node.vchan2FlushCh))
assert.Equal(t, 0, len(node.vchan2SyncService))
s, ok := node.vchan2SyncService[dmChannelName]
assert.False(t, ok)
assert.Nil(t, s)
})
t.Run("Test BackGroundGC", func(t *testing.T) {
collIDCh := make(chan UniqueID)
go node.BackGroundGC(collIDCh)
dmChannelName := "fake-dm-channel-test-BackGroundGC"
vchan := &datapb.VchannelInfo{
CollectionID: 1,
ChannelName: dmChannelName,
CheckPoints: []*datapb.CheckPoint{},
}
require.Equal(t, 0, len(node.vchan2FlushCh))
require.Equal(t, 0, len(node.vchan2SyncService))
err := node.NewDataSyncService(vchan)
require.NoError(t, err)
time.Sleep(time.Second)
require.Equal(t, 1, len(node.vchan2FlushCh))
require.Equal(t, 1, len(node.vchan2SyncService))
collIDCh <- 1
assert.Eventually(t, func() bool {
return len(node.vchan2FlushCh) == 0
}, time.Second*4, time.Millisecond)
assert.Equal(t, 0, len(node.vchan2SyncService))
s, ok := node.vchan2SyncService[dmChannelName]
assert.False(t, ok)
assert.Nil(t, s)
})
<-node.ctx.Done()
node.Stop()
}

View File

@ -27,6 +27,7 @@ import (
type dataSyncService struct {
ctx context.Context
cancelFn context.CancelFunc
fg *flowgraph.TimeTickedFlowGraph
flushChan <-chan *flushMsg
replica Replica
@ -34,6 +35,7 @@ type dataSyncService struct {
msFactory msgstream.Factory
collectionID UniqueID
dataService types.DataService
clearSignal chan<- UniqueID
}
func newDataSyncService(ctx context.Context,
@ -41,10 +43,16 @@ func newDataSyncService(ctx context.Context,
replica Replica,
alloc allocatorInterface,
factory msgstream.Factory,
vchan *datapb.VchannelInfo) *dataSyncService {
vchan *datapb.VchannelInfo,
clearSignal chan<- UniqueID,
) *dataSyncService {
ctx1, cancel := context.WithCancel(ctx)
service := &dataSyncService{
ctx: ctx,
ctx: ctx1,
cancelFn: cancel,
fg: nil,
flushChan: flushChan,
replica: replica,
@ -58,8 +66,8 @@ func newDataSyncService(ctx context.Context,
}
func (dsService *dataSyncService) start() {
log.Debug("Data Sync Service Start Successfully")
if dsService.fg != nil {
log.Debug("Data Sync Service starting flowgraph")
dsService.fg.Start()
} else {
log.Debug("Data Sync Service flowgraph nil")
@ -68,11 +76,14 @@ func (dsService *dataSyncService) start() {
func (dsService *dataSyncService) close() {
if dsService.fg != nil {
log.Debug("Data Sync Service closing flowgraph")
dsService.fg.Close()
}
dsService.cancelFn()
}
func (dsService *dataSyncService) initNodes(vchanPair *datapb.VchannelInfo) {
func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) {
// TODO: add delete pipeline support
dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx)
@ -124,8 +135,9 @@ func (dsService *dataSyncService) initNodes(vchanPair *datapb.VchannelInfo) {
}
return nil
}
var dmStreamNode Node = newDmInputNode(dsService.ctx, dsService.msFactory, vchanPair.GetChannelName(), vchanPair.GetCheckPoints())
var ddNode Node = newDDNode()
var dmStreamNode Node = newDmInputNode(dsService.ctx, dsService.msFactory, vchanInfo.GetChannelName(), vchanInfo.GetCheckPoints())
var ddNode Node = newDDNode(dsService.clearSignal, dsService.collectionID)
var insertBufferNode Node = newInsertBufferNode(
dsService.ctx,
dsService.replica,

View File

@ -61,7 +61,8 @@ func TestDataSyncService_Start(t *testing.T) {
FlushedSegments: []int64{},
}
sync := newDataSyncService(ctx, flushChan, replica, allocFactory, msFactory, vchan)
signalCh := make(chan UniqueID, 100)
sync := newDataSyncService(ctx, flushChan, replica, allocFactory, msFactory, vchan, signalCh)
sync.replica.addCollection(collMeta.ID, collMeta.Schema)
go sync.start()

View File

@ -23,6 +23,9 @@ import (
type ddNode struct {
BaseNode
clearSignal chan<- UniqueID
collectionID UniqueID
}
func (ddn *ddNode) Name() string {
@ -62,8 +65,10 @@ func (ddn *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
for _, msg := range msMsg.TsMessages() {
switch msg.Type() {
case commonpb.MsgType_DropCollection:
// TODO distroy dataSyncService and nodify datanode
log.Error("Distorying current flowgraph")
if msg.(*msgstream.DropCollectionMsg).GetCollectionID() == ddn.collectionID {
ddn.clearSignal <- ddn.collectionID
log.Info("Destroying current flowgraph")
}
case commonpb.MsgType_Insert:
resMsg := ddn.filterFlushedSegmentInsertMessages(msg.(*msgstream.InsertMsg))
if resMsg != nil {
@ -85,11 +90,13 @@ func (ddn *ddNode) filterFlushedSegmentInsertMessages(msg *msgstream.InsertMsg)
return msg
}
func newDDNode() *ddNode {
func newDDNode(clearSignal chan<- UniqueID, collID UniqueID) *ddNode {
baseNode := BaseNode{}
baseNode.SetMaxParallelism(Params.FlowGraphMaxQueueLength)
return &ddNode{
BaseNode: baseNode,
clearSignal: clearSignal,
collectionID: collID,
}
}

View File

@ -74,7 +74,7 @@ func newIDLEDataNodeMock() *DataNode {
return node
}
func newHEALTHDataNodeMock(dmChannelName, ddChannelName string) *DataNode {
func newHEALTHDataNodeMock(dmChannelName string) *DataNode {
var ctx context.Context
if debug {