mirror of https://github.com/milvus-io/milvus.git
Fix watcher loop quit and channel shouldDrop logic (#23402)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/23412/head
parent
a9a5f76f74
commit
ba84f52119
|
@ -59,6 +59,11 @@ func (c *channelStateTimer) getWatchers(prefix string) (clientv3.WatchChan, chan
|
|||
return c.etcdWatcher, c.timeoutWatcher
|
||||
}
|
||||
|
||||
func (c *channelStateTimer) getWatchersWithRevision(prefix string, revision int64) (clientv3.WatchChan, chan *ackEvent) {
|
||||
c.etcdWatcher = c.watchkv.WatchWithRevision(prefix, revision)
|
||||
return c.etcdWatcher, c.timeoutWatcher
|
||||
}
|
||||
|
||||
func (c *channelStateTimer) loadAllChannels(nodeID UniqueID) ([]*datapb.ChannelWatchInfo, error) {
|
||||
prefix := path.Join(Params.CommonCfg.DataCoordWatchSubPath.GetValue(), strconv.FormatInt(nodeID, 10))
|
||||
|
||||
|
@ -113,7 +118,7 @@ func (c *channelStateTimer) startOne(watchState datapb.ChannelWatchState, channe
|
|||
case <-ticker.C:
|
||||
// check tickle at path as :tickle/[prefix]/{channel_name}
|
||||
c.removeTimers([]string{channelName})
|
||||
log.Info("timeout and stop timer: wait for channel ACK timeout",
|
||||
log.Warn("timeout and stop timer: wait for channel ACK timeout",
|
||||
zap.String("watch state", watchState.String()),
|
||||
zap.Int64("nodeID", nodeID),
|
||||
zap.String("channel name", channelName),
|
||||
|
|
|
@ -31,6 +31,7 @@ import (
|
|||
"github.com/milvus-io/milvus-proto/go-api/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/kv"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
|
@ -164,7 +165,8 @@ func (c *ChannelManager) Startup(ctx context.Context, nodes []int64) error {
|
|||
checkerContext, cancel := context.WithCancel(ctx)
|
||||
c.stopChecker = cancel
|
||||
if c.stateChecker != nil {
|
||||
go c.stateChecker(checkerContext)
|
||||
// TODO get revision from reload logic
|
||||
go c.stateChecker(checkerContext, common.LatestRevision)
|
||||
log.Info("starting etcd states checker")
|
||||
}
|
||||
|
||||
|
@ -651,15 +653,21 @@ func (c *ChannelManager) processAck(e *ackEvent) {
|
|||
}
|
||||
}
|
||||
|
||||
type channelStateChecker func(context.Context)
|
||||
type channelStateChecker func(context.Context, int64)
|
||||
|
||||
func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context) {
|
||||
func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context, revision int64) {
|
||||
defer logutil.LogPanic()
|
||||
|
||||
// REF MEP#7 watchInfo paths are orgnized as: [prefix]/channel/{node_id}/{channel_name}
|
||||
watchPrefix := Params.CommonCfg.DataCoordWatchSubPath.GetValue()
|
||||
// TODO, this is risky, we'd better watch etcd with revision rather simply a path
|
||||
etcdWatcher, timeoutWatcher := c.stateTimer.getWatchers(watchPrefix)
|
||||
var etcdWatcher clientv3.WatchChan
|
||||
var timeoutWatcher chan *ackEvent
|
||||
if revision == common.LatestRevision {
|
||||
etcdWatcher, timeoutWatcher = c.stateTimer.getWatchers(watchPrefix)
|
||||
} else {
|
||||
etcdWatcher, timeoutWatcher = c.stateTimer.getWatchersWithRevision(watchPrefix, revision)
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
|
@ -674,14 +682,17 @@ func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context) {
|
|||
case event, ok := <-etcdWatcher:
|
||||
if !ok {
|
||||
log.Warn("datacoord failed to watch channel, return")
|
||||
// rewatch for transient network error, session handles process quiting if connect is not recoverable
|
||||
go c.watchChannelStatesLoop(ctx, revision)
|
||||
return
|
||||
}
|
||||
|
||||
if err := event.Err(); err != nil {
|
||||
log.Warn("datacoord watch channel hit error", zap.Error(event.Err()))
|
||||
// https://github.com/etcd-io/etcd/issues/8980
|
||||
// TODO add list and wathc with revision
|
||||
if event.Err() == v3rpc.ErrCompacted {
|
||||
go c.watchChannelStatesLoop(ctx)
|
||||
go c.watchChannelStatesLoop(ctx, event.CompactRevision)
|
||||
return
|
||||
}
|
||||
// if watch loop return due to event canceled, the datacoord is not functional anymore
|
||||
|
@ -689,6 +700,7 @@ func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context) {
|
|||
return
|
||||
}
|
||||
|
||||
revision = event.Header.GetRevision() + 1
|
||||
for _, evt := range event.Events {
|
||||
if evt.Type == clientv3.EventTypeDelete {
|
||||
continue
|
||||
|
|
|
@ -31,6 +31,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/kv"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/util/dependency"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
)
|
||||
|
||||
// waitAndStore simulates DataNode's action
|
||||
|
@ -114,7 +115,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
|
|||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
chManager.watchChannelStatesLoop(ctx)
|
||||
chManager.watchChannelStatesLoop(ctx, common.LatestRevision)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
|
@ -144,7 +145,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
|
|||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
chManager.watchChannelStatesLoop(ctx)
|
||||
chManager.watchChannelStatesLoop(ctx, common.LatestRevision)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
|
@ -175,7 +176,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
|
|||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
chManager.watchChannelStatesLoop(ctx)
|
||||
chManager.watchChannelStatesLoop(ctx, common.LatestRevision)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
|
@ -213,7 +214,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
|
|||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
chManager.watchChannelStatesLoop(ctx)
|
||||
chManager.watchChannelStatesLoop(ctx, common.LatestRevision)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
|
@ -256,7 +257,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
|
|||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
chManager.watchChannelStatesLoop(ctx)
|
||||
chManager.watchChannelStatesLoop(ctx, common.LatestRevision)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
|
@ -302,7 +303,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
|
|||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
chManager.watchChannelStatesLoop(ctx)
|
||||
chManager.watchChannelStatesLoop(ctx, common.LatestRevision)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
|
@ -348,7 +349,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
|
|||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
chManager.watchChannelStatesLoop(ctx)
|
||||
chManager.watchChannelStatesLoop(ctx, common.LatestRevision)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
|
@ -967,7 +968,7 @@ func TestChannelManager_BalanceBehaviour(t *testing.T) {
|
|||
ctx, cancel := context.WithCancel(context.TODO())
|
||||
chManager.stopChecker = cancel
|
||||
defer cancel()
|
||||
go chManager.stateChecker(ctx)
|
||||
go chManager.stateChecker(ctx, common.LatestRevision)
|
||||
|
||||
chManager.store = &ChannelStore{
|
||||
store: metakv,
|
||||
|
|
|
@ -331,10 +331,10 @@ func (c *ChannelStore) GetNodeChannelCount(nodeID int64) int {
|
|||
func (c *ChannelStore) Delete(nodeID int64) ([]*channel, error) {
|
||||
for id, info := range c.channelsInfo {
|
||||
if id == nodeID {
|
||||
delete(c.channelsInfo, id)
|
||||
if err := c.remove(nodeID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
delete(c.channelsInfo, id)
|
||||
return info.Channels, nil
|
||||
}
|
||||
}
|
||||
|
|
|
@ -333,22 +333,8 @@ func (h *ServerHandler) GetCollection(ctx context.Context, collectionID UniqueID
|
|||
|
||||
// CheckShouldDropChannel returns whether specified channel is marked to be removed
|
||||
func (h *ServerHandler) CheckShouldDropChannel(channel string) bool {
|
||||
/*
|
||||
segments := h.s.meta.GetSegmentsByChannel(channel)
|
||||
for _, segment := range segments {
|
||||
if segment.GetStartPosition() != nil && // filter empty segment
|
||||
// FIXME: we filter compaction generated segments
|
||||
// because datanode may not know the segment due to the network lag or
|
||||
// datacoord crash when handling CompleteCompaction.
|
||||
// FIXME: cancel this limitation for #12265
|
||||
// need to change a unified DropAndFlush to solve the root problem
|
||||
//len(segment.CompactionFrom) == 0 &&
|
||||
segment.GetState() != commonpb.SegmentState_Dropped {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return false*/
|
||||
return h.s.meta.catalog.ShouldDropChannel(h.s.ctx, channel)
|
||||
return h.s.meta.catalog.ShouldDropChannel(h.s.ctx, channel) ||
|
||||
!h.s.meta.catalog.ChannelExists(h.s.ctx, channel)
|
||||
}
|
||||
|
||||
// FinishDropChannel cleans up the remove flag for channels
|
||||
|
|
|
@ -2483,7 +2483,7 @@ func TestShouldDropChannel(t *testing.T) {
|
|||
})
|
||||
*/
|
||||
t.Run("channel name not in kv", func(t *testing.T) {
|
||||
assert.False(t, svr.handler.CheckShouldDropChannel("ch99"))
|
||||
assert.True(t, svr.handler.CheckShouldDropChannel("ch99"))
|
||||
})
|
||||
|
||||
t.Run("channel in remove flag", func(t *testing.T) {
|
||||
|
@ -2492,10 +2492,6 @@ func TestShouldDropChannel(t *testing.T) {
|
|||
|
||||
assert.True(t, svr.handler.CheckShouldDropChannel("ch1"))
|
||||
})
|
||||
|
||||
t.Run("channel name not matched", func(t *testing.T) {
|
||||
assert.False(t, svr.handler.CheckShouldDropChannel("ch2"))
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetRecoveryInfo(t *testing.T) {
|
||||
|
|
|
@ -293,6 +293,7 @@ func (node *DataNode) StartWatchChannels(ctx context.Context) {
|
|||
case event, ok := <-evtChan:
|
||||
if !ok {
|
||||
log.Warn("datanode failed to watch channel, return")
|
||||
go node.StartWatchChannels(ctx)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -115,7 +115,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
|
|||
}
|
||||
|
||||
if load := ddn.dropMode.Load(); load != nil && load.(bool) {
|
||||
log.Debug("ddNode in dropMode",
|
||||
log.Info("ddNode in dropMode",
|
||||
zap.String("vChannelName", ddn.vChannelName),
|
||||
zap.Int64("collection ID", ddn.collectionID))
|
||||
return []Msg{}
|
||||
|
|
|
@ -101,3 +101,8 @@ const (
|
|||
func IsSystemField(fieldID int64) bool {
|
||||
return fieldID < StartOfUserFieldID
|
||||
}
|
||||
|
||||
const (
|
||||
// LatestVerision is the magic number for watch latest revision
|
||||
LatestRevision = int64(-1)
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue