Add flowgraph manager and event manager (#15102)

1. Add flowgraph manager to manager start and release of dataSyncService
2. Add event manager to manager etcd watchInfo event
  - Make put event able to retry
  - Make delete event able to terminate the retry loop

See also: #14604, #14300

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/15185/head
XuanYang-cn 2022-01-13 14:21:34 +08:00 committed by GitHub
parent 4e956ee10c
commit 008e08a996
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 494 additions and 352 deletions

View File

@ -77,6 +77,7 @@ const illegalRequestErrStr = "Illegal request"
// makes sure DataNode implements types.DataNode
var _ types.DataNode = (*DataNode)(nil)
// Params from config.yaml
var Params paramtable.GlobalParamTable
// DataNode communicates with outside services and unioun all
@ -89,9 +90,6 @@ var Params paramtable.GlobalParamTable
// `NodeID` is unique to each datanode.
// `State` is current statement of this data node, indicating whether it's healthy.
//
// `vchan2SyncService` is a map of vchannlName to dataSyncService, so that datanode
// has ability to scale flowgraph.
// `vchan2FlushCh` holds flush-signal channels for every flowgraph.
// `clearSignal` is a signal channel for releasing the flowgraph resources.
// `segmentCache` stores all flushing and flushed segments.
type DataNode struct {
@ -101,10 +99,8 @@ type DataNode struct {
Role string
State atomic.Value // internalpb.StateCode_Initializing
// TODO struct
chanMut sync.RWMutex
vchan2SyncService map[string]*dataSyncService // vchannel name
vchan2FlushChs map[string]chan flushMsg // vchannel name to flush channels
flowgraphManager *flowgraphManager
eventManagerMap sync.Map // vchannel name -> channelEventManager
clearSignal chan string // vchannel name
segmentCache *Cache
@ -138,15 +134,14 @@ func NewDataNode(ctx context.Context, factory msgstream.Factory) *DataNode {
segmentCache: newCache(),
compactionExecutor: newCompactionExecutor(),
vchan2SyncService: make(map[string]*dataSyncService),
vchan2FlushChs: make(map[string]chan flushMsg),
clearSignal: make(chan string, 100),
flowgraphManager: newFlowgraphManager(),
clearSignal: make(chan string, 100),
}
node.UpdateStateCode(internalpb.StateCode_Abnormal)
return node
}
// Set etcd client
// SetEtcdClient sets etcd client for DataNode
func (node *DataNode) SetEtcdClient(etcdCli *clientv3.Client) {
node.etcdCli = etcdCli
}
@ -287,125 +282,134 @@ func (node *DataNode) checkWatchedList() error {
return err
}
for i, val := range values {
node.handleWatchInfo(keys[i], []byte(val))
node.handleWatchInfo(&event{eventType: putEventType}, keys[i], []byte(val))
}
return nil
}
// handleChannelEvt handles event from kv watch event
func (node *DataNode) handleChannelEvt(evt *clientv3.Event) {
var e *event
switch evt.Type {
case clientv3.EventTypePut: // datacoord shall put channels needs to be watched here
log.Debug("DataNode handleChannelEvt EventTypePut", zap.String("key", string(evt.Kv.Key)))
node.handleWatchInfo(string(evt.Kv.Key), evt.Kv.Value)
e = &event{
eventType: putEventType,
}
case clientv3.EventTypeDelete:
// guaranteed there is no "/" in channel name
parts := strings.Split(string(evt.Kv.Key), "/")
vchanName := parts[len(parts)-1]
log.Debug("DataNode handleChannelEvt EventTypeDelete",
zap.String("key", string(evt.Kv.Key)),
zap.String("vChanName", vchanName),
zap.Int64("node id", Params.DataNodeCfg.NodeID))
node.ReleaseDataSyncService(vchanName)
e = &event{
eventType: deleteEventType,
}
}
node.handleWatchInfo(e, string(evt.Kv.Key), evt.Kv.Value)
}
func (node *DataNode) handleWatchInfo(e *event, key string, data []byte) {
switch e.eventType {
case putEventType:
log.Info("DataNode is handling watchInfo put event", zap.String("key", key))
watchInfo, err := parsePutEventData(data)
if err != nil {
log.Warn("fail to handle watchInfo", zap.Int("event type", e.eventType), zap.String("key", key), zap.Error(err))
return
}
e.info = watchInfo
e.vChanName = watchInfo.GetVchan().GetChannelName()
case deleteEventType:
log.Info("DataNode is handling watchInfo delete event", zap.String("key", key))
e.vChanName = parseDeleteEventKey(key)
}
actualManager, loaded := node.eventManagerMap.LoadOrStore(e.vChanName, &channelEventManager{
eventChan: make(chan event, 10),
closeChan: make(chan struct{}),
handlePutEvent: node.handlePutEvent,
handleDeleteEvent: node.handleDeleteEvent,
})
if !loaded {
actualManager.(*channelEventManager).Run()
}
actualManager.(*channelEventManager).handleEvent(*e)
// Whenever a delete event comes, this eventManger will be removed from map
if e.eventType == deleteEventType {
if m, loaded := node.eventManagerMap.LoadAndDelete(e.vChanName); loaded {
m.(*channelEventManager).Close()
}
}
}
func (node *DataNode) handleWatchInfo(key string, data []byte) {
func parsePutEventData(data []byte) (*datapb.ChannelWatchInfo, error) {
watchInfo := datapb.ChannelWatchInfo{}
err := proto.Unmarshal(data, &watchInfo)
if err != nil {
log.Warn("fail to parse ChannelWatchInfo", zap.String("key", key), zap.Error(err))
return
return nil, fmt.Errorf("invalid event data: fail to parse ChannelWatchInfo, err: %v", err)
}
log.Debug("DataNode handleWatchInfo Unmarshal success", zap.String("key", key))
if watchInfo.State == datapb.ChannelWatchState_Complete {
log.Warn("DataNode handleWatchInfo State is already ChannelWatchState_Complete", zap.String("key", key))
return
return nil, fmt.Errorf("invalid event: event state is already ChannelWatchState_Compele")
}
if watchInfo.Vchan == nil {
log.Warn("found ChannelWatchInfo with nil VChannelInfo", zap.String("key", key))
return
return nil, fmt.Errorf("invalid event: ChannelWatchInfo with nil VChannelInfo")
}
log.Warn("DataNode handleWatchInfo try to NewDataSyncService", zap.String("key", key))
err = node.NewDataSyncService(watchInfo.Vchan)
if err != nil {
log.Warn("fail to create DataSyncService", zap.String("key", key), zap.Error(err))
return
}
log.Warn("DataNode handleWatchInfo NewDataSyncService success", zap.String("key", key))
watchInfo.State = datapb.ChannelWatchState_Complete
v, err := proto.Marshal(&watchInfo)
if err != nil {
log.Warn("DataNode handleWatchInfo fail to Marshal watchInfo", zap.String("key", key), zap.Error(err))
return
}
k := path.Join(Params.DataNodeCfg.ChannelWatchSubPath, fmt.Sprintf("%d", node.NodeID), watchInfo.GetVchan().GetChannelName())
log.Warn("DataNode handleWatchInfo try to Save", zap.String("key", key),
zap.String("k", k),
zap.String("v", string(v)))
err = node.watchKv.Save(k, string(v))
if err != nil {
log.Warn("DataNode handleWatchInfo fail to change WatchState to complete", zap.String("key", key), zap.Error(err))
node.ReleaseDataSyncService(key)
}
return &watchInfo, nil
}
// NewDataSyncService adds a new dataSyncService for new dmlVchannel and starts dataSyncService.
func (node *DataNode) NewDataSyncService(vchan *datapb.VchannelInfo) error {
node.chanMut.RLock()
if _, ok := node.vchan2SyncService[vchan.GetChannelName()]; ok {
node.chanMut.RUnlock()
return nil
}
node.chanMut.RUnlock()
func parseDeleteEventKey(key string) string {
parts := strings.Split(key, "/")
vChanName := parts[len(parts)-1]
return vChanName
}
replica, err := newReplica(node.ctx, node.rootCoord, vchan.CollectionID)
func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo) error {
vChanName := watchInfo.GetVchan().GetChannelName()
if err := node.flowgraphManager.addAndStart(node, watchInfo.GetVchan()); err != nil {
return fmt.Errorf("fail to add and start flowgraph for vChanName: %s, err: %v", vChanName, err)
}
log.Debug("handle put event: new data sync service success", zap.String("vChanName", vChanName))
watchInfo.State = datapb.ChannelWatchState_Complete
v, err := proto.Marshal(watchInfo)
if err != nil {
return err
return fmt.Errorf("fail to marshal watchInfo with complete state, vChanName: %s, err: %v", vChanName, err)
}
var alloc allocatorInterface = newAllocator(node.rootCoord)
k := path.Join(Params.DataNodeCfg.ChannelWatchSubPath, fmt.Sprintf("%d", node.NodeID), vChanName)
log.Debug("handle put event: try to save completed state", zap.String("key", k))
log.Debug("DataNode NewDataSyncService received Vchannel Info",
zap.Int64("collectionID", vchan.CollectionID),
zap.Int("Unflushed Segment Number", len(vchan.GetUnflushedSegments())),
zap.Int("Flushed Segment Number", len(vchan.GetFlushedSegments())),
)
flushCh := make(chan flushMsg, 100)
dataSyncService, err := newDataSyncService(node.ctx, flushCh, replica, alloc, node.msFactory, vchan, node.clearSignal, node.dataCoord, node.segmentCache, node.blobKv, node.compactionExecutor)
err = node.watchKv.Save(k, string(v))
// TODO DataNode unable to save into etcd, may need to panic
if err != nil {
log.Error("DataNode NewDataSyncService newDataSyncService failed",
zap.Error(err),
)
return err
node.releaseFlowgraph(vChanName)
return fmt.Errorf("fail to update completed state to etcd, vChanName: %s, err: %v", vChanName, err)
}
node.chanMut.Lock()
node.vchan2SyncService[vchan.GetChannelName()] = dataSyncService
node.vchan2FlushChs[vchan.GetChannelName()] = flushCh
node.chanMut.Unlock()
log.Info("DataNode NewDataSyncService success",
zap.Int64("Collection ID", vchan.GetCollectionID()),
zap.String("Vchannel name", vchan.GetChannelName()),
)
dataSyncService.start()
return nil
}
func (node *DataNode) handleDeleteEvent(vChanName string) {
node.releaseFlowgraph(vChanName)
}
func (node *DataNode) releaseFlowgraph(vChanName string) {
node.flowgraphManager.release(vChanName)
}
// BackGroundGC runs in background to release datanode resources
func (node *DataNode) BackGroundGC(vChannelCh <-chan string) {
log.Info("DataNode Background GC Start")
for {
select {
case vChan := <-vChannelCh:
log.Info("GC flowgraph", zap.String("vChan", vChan))
node.ReleaseDataSyncService(vChan)
case vchanName := <-vChannelCh:
log.Info("GC flowgraph", zap.String("vChanName", vchanName))
node.releaseFlowgraph(vchanName)
case <-node.ctx.Done():
log.Info("DataNode ctx done")
return
@ -413,23 +417,6 @@ func (node *DataNode) BackGroundGC(vChannelCh <-chan string) {
}
}
// ReleaseDataSyncService release flowgraph resources for a vchanName
func (node *DataNode) ReleaseDataSyncService(vchanName string) {
log.Info("Release flowgraph resources begin", zap.String("Vchannel", vchanName))
node.chanMut.Lock()
dss, ok := node.vchan2SyncService[vchanName]
delete(node.vchan2SyncService, vchanName)
delete(node.vchan2FlushChs, vchanName)
node.chanMut.Unlock()
if ok {
// This is a time-consuming process, better to put outside of the lock
dss.close()
}
log.Debug("Release flowgraph resources end", zap.String("Vchannel", vchanName))
}
// FilterThreshold is the start time ouf DataNode
var FilterThreshold Timestamp
@ -541,51 +528,11 @@ func (node *DataNode) GetComponentStates(ctx context.Context) (*internalpb.Compo
return states, nil
}
func (node *DataNode) getChannelNamebySegmentID(segID UniqueID) string {
node.chanMut.RLock()
defer node.chanMut.RUnlock()
for name, dataSync := range node.vchan2SyncService {
if dataSync.replica.hasSegment(segID, true) {
return name
}
}
return ""
}
func (node *DataNode) getChannelNamesbyCollectionID(collID UniqueID) []string {
node.chanMut.RLock()
defer node.chanMut.RUnlock()
channels := make([]string, 0, len(node.vchan2SyncService))
for name, dataSync := range node.vchan2SyncService {
if dataSync.collectionID == collID {
channels = append(channels, name)
}
}
return channels
}
// ReadyToFlush tells wether DataNode is ready for flushing
func (node *DataNode) ReadyToFlush() error {
if node.State.Load().(internalpb.StateCode) != internalpb.StateCode_Healthy {
return errors.New("DataNode not in HEALTHY state")
}
node.chanMut.RLock()
defer node.chanMut.RUnlock()
if len(node.vchan2SyncService) == 0 && len(node.vchan2FlushChs) == 0 {
// Healthy but Idle
msg := "DataNode HEALTHY but IDLE, please try WatchDmChannels to make it work"
log.Warn(msg)
return errors.New(msg)
}
if len(node.vchan2SyncService) != len(node.vchan2FlushChs) {
// TODO restart
msg := "DataNode HEALTHY but abnormal inside, restarting..."
log.Warn(msg)
return errors.New(msg)
}
return nil
}
@ -600,8 +547,8 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
ErrorCode: commonpb.ErrorCode_UnexpectedError,
}
if err := node.ReadyToFlush(); err != nil {
status.Reason = err.Error()
if node.State.Load().(internalpb.StateCode) != internalpb.StateCode_Healthy {
status.Reason = "DataNode not in HEALTHY state"
return status, nil
}
@ -613,16 +560,6 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
processSegments := func(segmentIDs []UniqueID, flushed bool) bool {
noErr := true
for _, id := range segmentIDs {
chanName := node.getChannelNamebySegmentID(id)
if len(chanName) == 0 {
log.Warn("FlushSegments failed, cannot find segment in DataNode replica",
zap.Int64("collectionID", req.GetCollectionID()), zap.Int64("segmentID", id))
status.Reason = fmt.Sprintf("DataNode replica not find segment %d!", id)
noErr = false
continue
}
if node.segmentCache.checkIfCached(id) {
// Segment in flushing, ignore
log.Info("Segment flushing, ignore the flush request until flush is done.",
@ -633,17 +570,15 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
node.segmentCache.Cache(id)
node.chanMut.RLock()
flushChs, ok := node.vchan2FlushChs[chanName]
node.chanMut.RUnlock()
if !ok {
flushCh, err := node.flowgraphManager.getFlushCh(id)
if err != nil {
status.Reason = "DataNode abnormal, restarting"
log.Error("DataNode abnormal, no flushCh for a vchannel")
log.Error("DataNode abnormal, no flushCh for a vchannel", zap.Error(err))
noErr = false
continue
}
flushChs <- flushMsg{
flushCh <- flushMsg{
msgID: req.Base.MsgID,
timestamp: req.Base.Timestamp,
segmentID: id,
@ -677,15 +612,7 @@ func (node *DataNode) Stop() error {
node.UpdateStateCode(internalpb.StateCode_Abnormal)
node.cancel()
node.chanMut.RLock()
defer node.chanMut.RUnlock()
// close services
for _, syncService := range node.vchan2SyncService {
if syncService != nil {
(*syncService).close()
}
}
node.flowgraphManager.dropAll()
if node.closer != nil {
err := node.closer.Close()
@ -796,7 +723,7 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan
ErrorCode: commonpb.ErrorCode_UnexpectedError,
}
ds, ok := node.vchan2SyncService[req.GetChannel()]
ds, ok := node.flowgraphManager.getFlowgraphService(req.GetChannel())
if !ok {
log.Warn("illegel compaction plan, channel not in this DataNode", zap.String("channel name", req.GetChannel()))
status.Reason = errIllegalCompactionPlan.Error()

View File

@ -29,11 +29,16 @@ import (
"time"
"github.com/milvus-io/milvus/internal/common"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/sessionutil"
@ -42,12 +47,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
)
func TestMain(t *testing.M) {
@ -62,6 +61,8 @@ func TestMain(t *testing.M) {
func TestDataNode(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
node := newIDLEDataNodeMock(ctx)
etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams)
assert.Nil(t, err)
@ -132,39 +133,6 @@ func TestDataNode(t *testing.T) {
assert.Equal(t, commonpb.ErrorCode_Success, stat.Status.ErrorCode)
})
t.Run("Test NewDataSyncService", func(t *testing.T) {
t.Skip()
ctx, cancel := context.WithCancel(context.Background())
node2 := newIDLEDataNodeMock(ctx)
err = node2.Start()
assert.Nil(t, err)
dmChannelName := "fake-by-dev-rootcoord-dml-channel-test-NewDataSyncService"
vchan := &datapb.VchannelInfo{
CollectionID: 1,
ChannelName: dmChannelName,
UnflushedSegments: []*datapb.SegmentInfo{},
}
require.Equal(t, 0, len(node2.vchan2FlushChs))
require.Equal(t, 0, len(node2.vchan2SyncService))
err := node2.NewDataSyncService(vchan)
assert.NoError(t, err)
assert.Equal(t, 1, len(node2.vchan2FlushChs))
assert.Equal(t, 1, len(node2.vchan2SyncService))
err = node2.NewDataSyncService(vchan)
assert.NoError(t, err)
assert.Equal(t, 1, len(node2.vchan2FlushChs))
assert.Equal(t, 1, len(node2.vchan2SyncService))
cancel()
<-node2.ctx.Done()
err = node2.Stop()
assert.Nil(t, err)
})
t.Run("Test FlushSegments", func(t *testing.T) {
dmChannelName := "fake-by-dev-rootcoord-dml-channel-test-FlushSegments"
@ -185,12 +153,14 @@ func TestDataNode(t *testing.T) {
UnflushedSegments: []*datapb.SegmentInfo{},
FlushedSegments: []*datapb.SegmentInfo{},
}
err := node1.NewDataSyncService(vchan)
assert.Nil(t, err)
service, ok := node1.vchan2SyncService[dmChannelName]
err := node1.flowgraphManager.addAndStart(node1, vchan)
require.Nil(t, err)
fgservice, ok := node1.flowgraphManager.getFlowgraphService(dmChannelName)
assert.True(t, ok)
err = service.replica.addNewSegment(0, 1, 1, dmChannelName, &internalpb.MsgPosition{}, &internalpb.MsgPosition{})
err = fgservice.replica.addNewSegment(0, 1, 1, dmChannelName, &internalpb.MsgPosition{}, &internalpb.MsgPosition{})
assert.Nil(t, err)
req := &datapb.FlushSegmentsRequest{
@ -282,25 +252,6 @@ func TestDataNode(t *testing.T) {
status, err = node1.FlushSegments(node1.ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
// manual inject meta error
node1.chanMut.Lock()
node1.vchan2FlushChs[dmChannelName+"1"] = node1.vchan2FlushChs[dmChannelName]
delete(node1.vchan2FlushChs, dmChannelName)
node1.chanMut.Unlock()
node1.segmentCache.Remove(0)
req = &datapb.FlushSegmentsRequest{
Base: &commonpb.MsgBase{},
DbID: 0,
CollectionID: 1,
SegmentIDs: []int64{0},
}
status, err = node1.FlushSegments(node1.ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
})
t.Run("Test GetTimeTickChannel", func(t *testing.T) {
@ -383,99 +334,13 @@ func TestDataNode(t *testing.T) {
for i, test := range testDataSyncs {
if i <= 2 {
err = node.NewDataSyncService(&datapb.VchannelInfo{CollectionID: 1, ChannelName: test.dmChannelName})
err = node.flowgraphManager.addAndStart(node, &datapb.VchannelInfo{CollectionID: 1, ChannelName: test.dmChannelName})
assert.Nil(t, err)
vchanNameCh <- test.dmChannelName
}
}
assert.Eventually(t, func() bool {
node.chanMut.Lock()
defer node.chanMut.Unlock()
return len(node.vchan2FlushChs) == 0
}, time.Second, time.Millisecond)
cancel()
})
t.Run("Test ReleaseDataSyncService", func(t *testing.T) {
dmChannelName := "fake-by-dev-rootcoord-dml-channel-test-NewDataSyncService"
vchan := &datapb.VchannelInfo{
CollectionID: 1,
ChannelName: dmChannelName,
UnflushedSegments: []*datapb.SegmentInfo{},
}
err := node.NewDataSyncService(vchan)
require.NoError(t, err)
require.Equal(t, 1, len(node.vchan2FlushChs))
require.Equal(t, 1, len(node.vchan2SyncService))
time.Sleep(100 * time.Millisecond)
node.ReleaseDataSyncService(dmChannelName)
assert.Equal(t, 0, len(node.vchan2FlushChs))
assert.Equal(t, 0, len(node.vchan2SyncService))
s, ok := node.vchan2SyncService[dmChannelName]
assert.False(t, ok)
assert.Nil(t, s)
})
t.Run("Test GetChannelName", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
node := newIDLEDataNodeMock(ctx)
testCollIDs := []UniqueID{0, 1, 2, 1}
testSegIDs := []UniqueID{10, 11, 12, 13}
testchanNames := []string{"a", "b", "c", "d"}
node.chanMut.Lock()
for i, name := range testchanNames {
replica := &SegmentReplica{
collectionID: testCollIDs[i],
newSegments: make(map[UniqueID]*Segment),
}
err = replica.addNewSegment(testSegIDs[i], testCollIDs[i], 0, name, &internalpb.MsgPosition{}, nil)
assert.Nil(t, err)
node.vchan2SyncService[name] = &dataSyncService{collectionID: testCollIDs[i], replica: replica}
}
node.chanMut.Unlock()
type Test struct {
inCollID UniqueID
expectedChannels []string
inSegID UniqueID
expectedChannel string
}
tests := []Test{
{0, []string{"a"}, 10, "a"},
{1, []string{"b", "d"}, 11, "b"},
{2, []string{"c"}, 12, "c"},
{3, []string{}, 13, "d"},
{3, []string{}, 100, ""},
}
for _, test := range tests {
actualChannels := node.getChannelNamesbyCollectionID(test.inCollID)
assert.ElementsMatch(t, test.expectedChannels, actualChannels)
actualChannel := node.getChannelNamebySegmentID(test.inSegID)
assert.Equal(t, test.expectedChannel, actualChannel)
}
cancel()
})
cancel()
<-node.ctx.Done()
err = node.Stop()
require.Nil(t, err)
}
func TestWatchChannel(t *testing.T) {
@ -495,6 +360,7 @@ func TestWatchChannel(t *testing.T) {
defer cancel()
t.Run("test watch channel", func(t *testing.T) {
// GOOSE TODO
kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath)
oldInvalidCh := "datanode-etcd-test-by-dev-rootcoord-dml-channel-invalid"
path := fmt.Sprintf("%s/%d/%s", Params.DataNodeCfg.ChannelWatchSubPath, node.NodeID, oldInvalidCh)
@ -540,29 +406,27 @@ func TestWatchChannel(t *testing.T) {
// wait for check goroutine received 2 events
<-c
node.chanMut.RLock()
_, has := node.vchan2SyncService[ch]
node.chanMut.RUnlock()
assert.True(t, has)
exist := node.flowgraphManager.exist(ch)
assert.True(t, exist)
err = kv.RemoveWithPrefix(fmt.Sprintf("%s/%d", Params.DataNodeCfg.ChannelWatchSubPath, node.NodeID))
assert.Nil(t, err)
//TODO there is not way to sync Release done, use sleep for now
time.Sleep(100 * time.Millisecond)
node.chanMut.RLock()
_, has = node.vchan2SyncService[ch]
node.chanMut.RUnlock()
assert.False(t, has)
exist = node.flowgraphManager.exist(ch)
assert.False(t, exist)
})
t.Run("handle watch info failed", func(t *testing.T) {
node.handleWatchInfo("test1", []byte{23})
e := &event{
eventType: putEventType,
}
node.chanMut.RLock()
_, has := node.vchan2SyncService["test1"]
assert.False(t, has)
node.chanMut.RUnlock()
node.handleWatchInfo(e, "test1", []byte{23})
exist := node.flowgraphManager.exist("test1")
assert.False(t, exist)
info := datapb.ChannelWatchInfo{
Vchan: nil,
@ -570,12 +434,10 @@ func TestWatchChannel(t *testing.T) {
}
bs, err := proto.Marshal(&info)
assert.NoError(t, err)
node.handleWatchInfo("test2", bs)
node.handleWatchInfo(e, "test2", bs)
node.chanMut.RLock()
_, has = node.vchan2SyncService["test2"]
assert.False(t, has)
node.chanMut.RUnlock()
exist = node.flowgraphManager.exist("test2")
assert.False(t, exist)
info = datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{},
@ -587,12 +449,9 @@ func TestWatchChannel(t *testing.T) {
node.msFactory = &FailMessageStreamFactory{
node.msFactory,
}
node.handleWatchInfo("test3", bs)
node.chanMut.RLock()
_, has = node.vchan2SyncService["test3"]
assert.False(t, has)
node.chanMut.RUnlock()
node.handleWatchInfo(e, "test3", bs)
exist = node.flowgraphManager.exist("test3")
assert.False(t, exist)
})
}

View File

@ -0,0 +1,101 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datanode
import (
"time"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
"go.uber.org/zap"
)
const retryWatchInterval = 20 * time.Second
type event struct {
eventType int
vChanName string
info *datapb.ChannelWatchInfo
}
type channelEventManager struct {
eventChan chan event
closeChan chan struct{}
handlePutEvent func(watchInfo *datapb.ChannelWatchInfo) error // node.handlePutEvent
handleDeleteEvent func(vChanName string) // node.handleDeleteEvent
}
const (
putEventType = 1
deleteEventType = 2
)
func (e *channelEventManager) Run() {
go func() {
for {
select {
case event := <-e.eventChan:
switch event.eventType {
case putEventType:
// Trigger retry for-loop when fail to handle put event for the first time
if err := e.handlePutEvent(event.info); err != nil {
for {
log.Warn("handle put event fail, starting retry",
zap.String("vChanName", event.vChanName),
zap.String("retry interval", retryWatchInterval.String()),
zap.Error(err))
<-time.NewTimer(time.Second).C
select {
case e, ok := <-e.eventChan:
// When getting a delete event at next retry, exit retry loop
// When getting a put event, just continue the retry
if ok && e.eventType == deleteEventType {
log.Warn("delete event triggerred, terminating retry.",
zap.String("vChanName", event.vChanName))
return
}
default:
}
err = e.handlePutEvent(event.info)
if err == nil {
log.Debug("retry to handle put event successfully",
zap.String("vChanName", event.vChanName))
return
}
}
}
case deleteEventType:
e.handleDeleteEvent(event.vChanName)
}
case <-e.closeChan:
return
}
}
}()
}
func (e *channelEventManager) handleEvent(event event) {
e.eventChan <- event
}
func (e *channelEventManager) Close() {
close(e.closeChan)
}

View File

@ -0,0 +1,127 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datanode
import (
"fmt"
"sync"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
"go.uber.org/zap"
)
type flowgraphManager struct {
flowgraphs sync.Map // vChannelName -> dataSyncService
}
func newFlowgraphManager() *flowgraphManager {
return &flowgraphManager{}
}
func (fm *flowgraphManager) addAndStart(dn *DataNode, vchan *datapb.VchannelInfo) error {
log.Debug("received Vchannel Info",
zap.String("vChannelName", vchan.GetChannelName()),
zap.Int("Unflushed Segment Number", len(vchan.GetUnflushedSegments())),
zap.Int("Flushed Segment Number", len(vchan.GetFlushedSegments())),
)
if _, ok := fm.flowgraphs.Load(vchan.GetChannelName()); ok {
log.Warn("try to add an existed DataSyncService", zap.String("vChannelName", vchan.GetChannelName()))
return nil
}
replica, err := newReplica(dn.ctx, dn.rootCoord, vchan.GetCollectionID())
if err != nil {
log.Warn("new replica failed", zap.String("vChannelName", vchan.GetChannelName()), zap.Error(err))
return err
}
var alloc allocatorInterface = newAllocator(dn.rootCoord)
dataSyncService, err := newDataSyncService(dn.ctx, make(chan flushMsg, 100), replica, alloc, dn.msFactory, vchan, dn.clearSignal, dn.dataCoord, dn.segmentCache, dn.blobKv, dn.compactionExecutor)
if err != nil {
log.Warn("new data sync service fail", zap.String("vChannelName", vchan.GetChannelName()), zap.Error(err))
return err
}
log.Info("successfully created dataSyncService", zap.String("vChannelName", vchan.GetChannelName()))
dataSyncService.start()
log.Info("successfully started dataSyncService", zap.String("vChannelName", vchan.GetChannelName()))
fm.flowgraphs.Store(vchan.GetChannelName(), dataSyncService)
return nil
}
func (fm *flowgraphManager) release(vchanName string) {
log.Debug("release flowgraph resources begin", zap.String("vChannelName", vchanName))
if fg, loaded := fm.flowgraphs.LoadAndDelete(vchanName); loaded {
fg.(*dataSyncService).close()
}
log.Debug("release flowgraph resources end", zap.String("Vchannel", vchanName))
}
func (fm *flowgraphManager) getFlushCh(segID UniqueID) (chan<- flushMsg, error) {
var (
flushCh chan flushMsg
loaded = false
)
fm.flowgraphs.Range(func(key, value interface{}) bool {
fg := value.(*dataSyncService)
if fg.replica.hasSegment(segID, true) {
loaded = true
flushCh = fg.flushCh
return false
}
return true
})
if loaded {
return flushCh, nil
}
return nil, fmt.Errorf("cannot find segment %d in all flowgraphs", segID)
}
func (fm *flowgraphManager) getFlowgraphService(vchan string) (*dataSyncService, bool) {
fg, ok := fm.flowgraphs.Load(vchan)
if ok {
return fg.(*dataSyncService), ok
}
return nil, ok
}
func (fm *flowgraphManager) exist(vchan string) bool {
_, exist := fm.getFlowgraphService(vchan)
return exist
}
func (fm *flowgraphManager) dropAll() {
log.Debug("start drop all flowgraph resources in DataNode")
fm.flowgraphs.Range(func(key, value interface{}) bool {
value.(*dataSyncService).close()
fm.flowgraphs.Delete(key.(string))
log.Debug("successfully dropped flowgraph", zap.String("vChannelName", key.(string)))
return true
})
log.Debug("end drop all flowgraph resources in DataNode")
}

View File

@ -0,0 +1,128 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datanode
import (
"context"
"testing"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestFlowGraphManager(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams)
assert.Nil(t, err)
defer etcdCli.Close()
node := newIDLEDataNodeMock(ctx)
node.SetEtcdClient(etcdCli)
err = node.Init()
require.Nil(t, err)
err = node.Start()
require.Nil(t, err)
fm := newFlowgraphManager()
defer fm.dropAll()
t.Run("Test addAndStart", func(t *testing.T) {
vchanName := "by-dev-rootcoord-dml-test-flowgraphmanager-addAndStart"
vchan := &datapb.VchannelInfo{
CollectionID: 1,
ChannelName: vchanName,
}
require.False(t, fm.exist(vchanName))
err := fm.addAndStart(node, vchan)
assert.NoError(t, err)
assert.True(t, fm.exist(vchanName))
fm.dropAll()
})
t.Run("Test Release", func(t *testing.T) {
vchanName := "by-dev-rootcoord-dml-test-flowgraphmanager-Release"
vchan := &datapb.VchannelInfo{
CollectionID: 1,
ChannelName: vchanName,
}
require.False(t, fm.exist(vchanName))
err := fm.addAndStart(node, vchan)
assert.NoError(t, err)
assert.True(t, fm.exist(vchanName))
fm.release(vchanName)
assert.False(t, fm.exist(vchanName))
fm.dropAll()
})
t.Run("Test getFlushCh", func(t *testing.T) {
vchanName := "by-dev-rootcoord-dml-test-flowgraphmanager-getFlushCh"
vchan := &datapb.VchannelInfo{
CollectionID: 1,
ChannelName: vchanName,
}
require.False(t, fm.exist(vchanName))
err := fm.addAndStart(node, vchan)
assert.NoError(t, err)
assert.True(t, fm.exist(vchanName))
fg, ok := fm.getFlowgraphService(vchanName)
require.True(t, ok)
err = fg.replica.addNewSegment(100, 1, 10, vchanName, &internalpb.MsgPosition{}, &internalpb.MsgPosition{})
require.NoError(t, err)
tests := []struct {
isvalid bool
inSegID UniqueID
description string
}{
{true, 100, "valid input for existed segmentID 100"},
{false, 101, "invalid input for not existed segmentID 101"},
}
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
ch, err := fm.getFlushCh(test.inSegID)
if test.isvalid {
assert.NoError(t, err)
assert.NotNil(t, ch)
} else {
assert.Error(t, err)
assert.Nil(t, ch)
}
})
}
})
t.Run("Test getFlowgraphService", func(t *testing.T) {
fg, ok := fm.getFlowgraphService("channel-not-exist")
assert.False(t, ok)
assert.Nil(t, fg)
})
}