Adopt the extended state in DataCoord (#16200) (#16244)

ChannelWatchInfo with ToWatch, ToRelease will trigger a timer.
ChannelManager now reacts to different ChannelWatch states.

- WatchSuccess > log this info

- WatchFailure/WatchTimeout > ToRelease

- ReleaseSuccess > Delete, reassign if not from DropCollection

- ReleaseFailure/ReleaseTimeout > Cleanup subscription and Delete,
  reassgin if not from DropCollection.

Some Notes:
1. Reassignment will add this channel to buffer if there's only one node.

See also: #15846

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/16327/head
XuanYang-cn 2022-03-29 17:09:28 +08:00 committed by GitHub
parent 20d89afcf2
commit 6f437eb312
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 1610 additions and 86 deletions

View File

@ -31,4 +31,4 @@ AlignTrailingComments: true
SortIncludes: false
Standard: Latest
AlignAfterOpenBracket: Align
BinPackParameters: false
BinPackParameters: false

View File

@ -0,0 +1,178 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord
import (
"fmt"
"path"
"strconv"
"sync"
"time"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)
type channelStateTimer struct {
watchkv kv.MetaKv
runningTimers sync.Map // channel name to timer stop channels
etcdWatcher clientv3.WatchChan
timeoutWatcher chan *ackEvent
}
func newChannelStateTimer(kv kv.MetaKv) *channelStateTimer {
return &channelStateTimer{
watchkv: kv,
timeoutWatcher: make(chan *ackEvent, 20),
}
}
func (c *channelStateTimer) getWatchers(prefix string) (clientv3.WatchChan, chan *ackEvent) {
if c.etcdWatcher == nil {
c.etcdWatcher = c.watchkv.WatchWithPrefix(prefix)
}
return c.etcdWatcher, c.timeoutWatcher
}
func (c *channelStateTimer) loadAllChannels(nodeID UniqueID) ([]*datapb.ChannelWatchInfo, error) {
prefix := path.Join(Params.DataCoordCfg.ChannelWatchSubPath, strconv.FormatInt(nodeID, 10))
// TODO: change to LoadWithPrefixBytes
keys, values, err := c.watchkv.LoadWithPrefix(prefix)
if err != nil {
return nil, err
}
ret := []*datapb.ChannelWatchInfo{}
for i, k := range keys {
watchInfo, err := parseWatchInfo(k, []byte(values[i]))
if err != nil {
// TODO: delete this kv later
log.Warn("invalid watchInfo loaded", zap.Error(err))
continue
}
ret = append(ret, watchInfo)
}
return ret, nil
}
// startOne can write ToWatch or ToRelease states.
func (c *channelStateTimer) startOne(watchState datapb.ChannelWatchState, channelName string, nodeID UniqueID, timeoutTs int64) {
if timeoutTs == 0 {
log.Debug("zero timeoutTs, skip starting timer",
zap.String("watch state", watchState.String()),
zap.Int64("nodeID", nodeID),
zap.String("channel name", channelName),
)
return
}
stop := make(chan struct{})
c.runningTimers.Store(channelName, stop)
timeoutT := time.Unix(0, timeoutTs)
go func() {
log.Debug("timer started",
zap.String("watch state", watchState.String()),
zap.Int64("nodeID", nodeID),
zap.String("channel name", channelName),
zap.Time("timeout time", timeoutT))
select {
case <-time.NewTimer(time.Until(timeoutT)).C:
log.Info("timeout and stop timer: wait for channel ACK timeout",
zap.String("state", watchState.String()),
zap.String("channel name", channelName),
zap.Time("timeout time", timeoutT))
ackType := getAckType(watchState)
c.notifyTimeoutWatcher(&ackEvent{ackType, channelName, nodeID})
case <-stop:
log.Debug("stop timer before timeout",
zap.String("state", watchState.String()),
zap.String("channel name", channelName),
zap.Time("timeout time", timeoutT))
}
}()
}
func (c *channelStateTimer) notifyTimeoutWatcher(e *ackEvent) {
c.timeoutWatcher <- e
}
func (c *channelStateTimer) removeTimers(channels []string) {
for _, channel := range channels {
if stop, ok := c.runningTimers.LoadAndDelete(channel); ok {
close(stop.(chan struct{}))
}
}
}
func (c *channelStateTimer) stopIfExsit(e *ackEvent) {
stop, ok := c.runningTimers.LoadAndDelete(e.channelName)
if ok && e.ackType != watchTimeoutAck && e.ackType != releaseTimeoutAck {
close(stop.(chan struct{}))
}
}
func parseWatchInfo(key string, data []byte) (*datapb.ChannelWatchInfo, error) {
watchInfo := datapb.ChannelWatchInfo{}
if err := proto.Unmarshal(data, &watchInfo); err != nil {
return nil, fmt.Errorf("invalid event data: fail to parse ChannelWatchInfo, key: %s, err: %v", key, err)
}
if watchInfo.Vchan == nil {
return nil, fmt.Errorf("invalid event: ChannelWatchInfo with nil VChannelInfo, key: %s", key)
}
return &watchInfo, nil
}
// parseAckEvent transfers key-values from etcd into ackEvent
func parseAckEvent(nodeID UniqueID, info *datapb.ChannelWatchInfo) *ackEvent {
ret := &ackEvent{
ackType: getAckType(info.GetState()),
channelName: info.GetVchan().GetChannelName(),
nodeID: nodeID,
}
return ret
}
func getAckType(state datapb.ChannelWatchState) ackType {
switch state {
case datapb.ChannelWatchState_WatchSuccess, datapb.ChannelWatchState_Complete:
return watchSuccessAck
case datapb.ChannelWatchState_WatchFailure:
return watchFailAck
case datapb.ChannelWatchState_ReleaseSuccess:
return releaseSuccessAck
case datapb.ChannelWatchState_ReleaseFailure:
return releaseFailAck
case datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_Uncomplete: // unchange watch states generates timeout acks
return watchTimeoutAck
case datapb.ChannelWatchState_ToRelease: // unchange watch states generates timeout acks
return releaseTimeoutAck
default:
return invalidAck
}
}

View File

@ -0,0 +1,198 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord
import (
"path"
"testing"
"time"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/golang/protobuf/proto"
)
func TestChannelStateTimer(t *testing.T) {
kv := getMetaKv(t)
defer kv.Close()
prefix := Params.DataCoordCfg.ChannelWatchSubPath
t.Run("test getWatcher", func(t *testing.T) {
timer := newChannelStateTimer(kv)
etcdCh, timeoutCh := timer.getWatchers(prefix)
assert.NotNil(t, etcdCh)
assert.NotNil(t, timeoutCh)
timer.getWatchers(prefix)
assert.NotNil(t, etcdCh)
assert.NotNil(t, timeoutCh)
})
t.Run("test loadAllChannels", func(t *testing.T) {
defer kv.RemoveWithPrefix("")
timer := newChannelStateTimer(kv)
timer.loadAllChannels(1)
validWatchInfo := datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{},
StartTs: time.Now().Unix(),
State: datapb.ChannelWatchState_ToWatch,
TimeoutTs: time.Now().Add(20 * time.Millisecond).UnixNano(),
}
validData, err := proto.Marshal(&validWatchInfo)
require.NoError(t, err)
prefix = Params.DataCoordCfg.ChannelWatchSubPath
prepareKvs := map[string]string{
path.Join(prefix, "1/channel-1"): "invalidWatchInfo",
path.Join(prefix, "1/channel-2"): string(validData),
path.Join(prefix, "2/channel-3"): string(validData),
}
err = kv.MultiSave(prepareKvs)
require.NoError(t, err)
tests := []struct {
inNodeID UniqueID
outLen int
}{
{1, 1},
{2, 1},
{3, 0},
}
for _, test := range tests {
infos, err := timer.loadAllChannels(test.inNodeID)
assert.NoError(t, err)
assert.Equal(t, test.outLen, len(infos))
}
})
t.Run("test startOne", func(t *testing.T) {
normalTimeoutTs := time.Now().Add(20 * time.Second).UnixNano()
nowTimeoutTs := time.Now().UnixNano()
zeroTimeoutTs := int64(0)
tests := []struct {
channelName string
timeoutTs int64
description string
}{
{"channel-1", normalTimeoutTs, "test stop"},
{"channel-2", nowTimeoutTs, "test timeout"},
{"channel-3", zeroTimeoutTs, "not start"},
}
timer := newChannelStateTimer(kv)
_, timeoutCh := timer.getWatchers(prefix)
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
timer.startOne(datapb.ChannelWatchState_ToWatch, test.channelName, 1, test.timeoutTs)
if test.timeoutTs == nowTimeoutTs {
e := <-timeoutCh
assert.Equal(t, watchTimeoutAck, e.ackType)
assert.Equal(t, test.channelName, e.channelName)
} else {
timer.stopIfExsit(&ackEvent{watchSuccessAck, test.channelName, 1})
}
})
}
timer.startOne(datapb.ChannelWatchState_ToWatch, "channel-remove", 1, normalTimeoutTs)
timer.removeTimers([]string{"channel-remove"})
})
}
func TestChannelStateTimer_parses(t *testing.T) {
const (
ValidTest = true
InValidTest = false
)
t.Run("test parseWatchInfo", func(t *testing.T) {
validWatchInfo := datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{},
StartTs: time.Now().Unix(),
State: datapb.ChannelWatchState_ToWatch,
TimeoutTs: time.Now().Add(20 * time.Millisecond).UnixNano(),
}
validData, err := proto.Marshal(&validWatchInfo)
require.NoError(t, err)
invalidDataUnableToMarshal := []byte("invalidData")
invalidWatchInfoNilVchan := validWatchInfo
invalidWatchInfoNilVchan.Vchan = nil
invalidDataNilVchan, err := proto.Marshal(&invalidWatchInfoNilVchan)
require.NoError(t, err)
tests := []struct {
inKey string
inData []byte
isValid bool
description string
}{
{"key", validData, ValidTest, "test with valid watchInfo"},
{"key", invalidDataUnableToMarshal, InValidTest, "test with watchInfo unable to marshal"},
{"key", invalidDataNilVchan, InValidTest, "test with watchInfo with nil Vchan"},
}
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
info, err := parseWatchInfo(test.inKey, test.inData)
if test.isValid {
assert.NoError(t, err)
assert.NotNil(t, info)
assert.Equal(t, info.GetState(), validWatchInfo.GetState())
assert.Equal(t, info.GetStartTs(), validWatchInfo.GetStartTs())
assert.Equal(t, info.GetTimeoutTs(), validWatchInfo.GetTimeoutTs())
} else {
assert.Nil(t, info)
assert.Error(t, err)
}
})
}
})
t.Run("test getAckType", func(t *testing.T) {
tests := []struct {
inState datapb.ChannelWatchState
outAckType ackType
}{
{datapb.ChannelWatchState_WatchSuccess, watchSuccessAck},
{datapb.ChannelWatchState_WatchFailure, watchFailAck},
{datapb.ChannelWatchState_ToWatch, watchTimeoutAck},
{datapb.ChannelWatchState_Uncomplete, watchTimeoutAck},
{datapb.ChannelWatchState_ReleaseSuccess, releaseSuccessAck},
{datapb.ChannelWatchState_ReleaseFailure, releaseFailAck},
{datapb.ChannelWatchState_ToRelease, releaseTimeoutAck},
{100, invalidAck},
}
for _, test := range tests {
assert.Equal(t, test.outAckType, getAckType(test.inState))
}
})
}

View File

@ -26,6 +26,10 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/util/logutil"
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"stathat.com/c/consistent"
)
@ -47,6 +51,10 @@ type ChannelManager struct {
reassignPolicy ChannelReassignPolicy
bgChecker ChannelBGChecker
msgstreamFactory msgstream.Factory
stateChecker channelStateChecker
stopChecker context.CancelFunc
stateTimer *channelStateTimer
}
type channel struct {
@ -69,16 +77,21 @@ func withMsgstreamFactory(f msgstream.Factory) ChannelManagerOpt {
return func(c *ChannelManager) { c.msgstreamFactory = f }
}
func withStateChecker() ChannelManagerOpt {
return func(c *ChannelManager) { c.stateChecker = c.watchChannelStatesLoop }
}
// NewChannelManager creates and returns a new ChannelManager instance.
func NewChannelManager(
kv kv.TxnKV,
kv kv.MetaKv, // for TxnKv and MetaKv
h Handler,
options ...ChannelManagerOpt,
) (*ChannelManager, error) {
c := &ChannelManager{
h: h,
factory: NewChannelPolicyFactoryV1(kv),
store: NewChannelStore(kv),
h: h,
factory: NewChannelPolicyFactoryV1(kv),
store: NewChannelStore(kv),
stateTimer: newChannelStateTimer(kv),
}
if err := c.store.Reload(); err != nil {
@ -98,7 +111,7 @@ func NewChannelManager(
}
// Startup adjusts the channel store according to current cluster states.
func (c *ChannelManager) Startup(nodes []int64) error {
func (c *ChannelManager) Startup(ctx context.Context, nodes []int64) error {
channels := c.store.GetNodesChannels()
// Retrieve the current old nodes.
oNodes := make([]int64, 0, len(channels))
@ -106,6 +119,11 @@ func (c *ChannelManager) Startup(nodes []int64) error {
oNodes = append(oNodes, c.NodeID)
}
// Process watch states for old nodes.
if err := c.checkOldNodes(oNodes); err != nil {
return err
}
// Add new online nodes to the cluster.
newOnLines := c.getNewOnLines(nodes, oNodes)
for _, n := range newOnLines {
@ -125,6 +143,13 @@ func (c *ChannelManager) Startup(nodes []int64) error {
// Unwatch and drop channel with drop flag.
c.unwatchDroppedChannels()
if c.stateChecker != nil {
ctx1, cancel := context.WithCancel(ctx)
c.stopChecker = cancel
go c.stateChecker(ctx1)
log.Debug("starting etcd states checker")
}
log.Info("cluster start up",
zap.Any("nodes", nodes),
zap.Any("oNodes", oNodes),
@ -133,6 +158,50 @@ func (c *ChannelManager) Startup(nodes []int64) error {
return nil
}
// checkOldNodes processes the existing watch channels when starting up.
// ToWatch get startTs and timeoutTs, start timer
// WatchSuccess ignore
// WatchFail ToRelease
// ToRelase get startTs and timeoutTs, start timer
// ReleaseSuccess remove
// ReleaseFail clean up and remove
func (c *ChannelManager) checkOldNodes(nodes []UniqueID) error {
for _, nodeID := range nodes {
watchInfos, err := c.stateTimer.loadAllChannels(nodeID)
if err != nil {
return err
}
for _, info := range watchInfos {
channelName := info.GetVchan().GetChannelName()
switch info.GetState() {
case datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_Uncomplete:
c.stateTimer.startOne(datapb.ChannelWatchState_ToWatch, channelName, nodeID, info.GetTimeoutTs())
case datapb.ChannelWatchState_WatchFailure:
if err := c.Release(nodeID, channelName); err != nil {
return err
}
case datapb.ChannelWatchState_ToRelease:
c.stateTimer.startOne(datapb.ChannelWatchState_ToRelease, channelName, nodeID, info.GetTimeoutTs())
case datapb.ChannelWatchState_ReleaseSuccess:
if err := c.toDelete(nodeID, channelName); err != nil {
return err
}
case datapb.ChannelWatchState_ReleaseFailure:
if err := c.cleanUpAndDelete(nodeID, channelName); err != nil {
return err
}
}
}
}
return nil
}
// unwatchDroppedChannels removes drops channel that are marked to drop.
func (c *ChannelManager) unwatchDroppedChannels() {
nodeChannels := c.store.GetNodesChannels()
@ -224,20 +293,21 @@ func (c *ChannelManager) AddNode(nodeID int64) error {
c.store.Add(nodeID)
// the default registerPolicy doesn't reassgin channels already there
updates := c.registerPolicy(c.store, nodeID)
if len(updates) <= 0 {
return nil
}
log.Info("register node",
zap.Int64("registered node", nodeID),
zap.Array("updates", updates))
for _, op := range updates {
if op.Type == Add {
c.fillChannelWatchInfo(op)
}
}
return c.store.Update(updates)
return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch)
}
// DeleteNode deletes the node from the cluster.
// DeleteNode deletes the nodeID's watchInfos in Etcd and reassign the channels to other Nodes
func (c *ChannelManager) DeleteNode(nodeID int64) error {
c.mu.Lock()
defer c.mu.Unlock()
@ -254,12 +324,7 @@ func (c *ChannelManager) DeleteNode(nodeID int64) error {
zap.Int64("unregistered node", nodeID),
zap.Array("updates", updates))
for _, v := range updates {
if v.Type == Add {
c.fillChannelWatchInfo(v)
}
}
if err := c.store.Update(updates); err != nil {
if err := c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch); err != nil {
return err
}
_, err := c.store.Delete(nodeID)
@ -312,24 +377,16 @@ func (c *ChannelManager) Watch(ch *channel) error {
if len(updates) == 0 {
return nil
}
log.Info("watch channel",
log.Info("try to update channel watch info with ToWatch state",
zap.Any("channel", ch),
zap.Array("updates", updates))
for _, v := range updates {
if v.Type == Add {
c.fillChannelWatchInfo(v)
}
}
err := c.store.Update(updates)
err := c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch)
if err != nil {
log.Error("ChannelManager RWChannelStore update failed", zap.Int64("collectionID", ch.CollectionID),
zap.String("channelName", ch.Name), zap.Error(err))
return err
log.Warn("fail to update channel watch info with ToWatch state",
zap.Any("channel", ch), zap.Array("updates", updates), zap.Error(err))
}
log.Info("ChannelManager RWChannelStore update success", zap.Int64("collectionID", ch.CollectionID),
zap.String("channelName", ch.Name))
return nil
return err
}
// fillChannelWatchInfo updates the channel op by filling in channel watch info.
@ -346,6 +403,31 @@ func (c *ChannelManager) fillChannelWatchInfo(op *ChannelOp) {
}
}
// fillChannelWatchInfoWithState updates the channel op by filling in channel watch info.
func (c *ChannelManager) fillChannelWatchInfoWithState(op *ChannelOp, state datapb.ChannelWatchState) []string {
var channelsWithTimer = []string{}
startTs := time.Now().Unix()
timeoutTs := time.Now().Add(maxWatchDuration).UnixNano()
for _, ch := range op.Channels {
vcInfo := c.h.GetVChanPositions(ch.Name, ch.CollectionID, allPartitionID)
info := &datapb.ChannelWatchInfo{
Vchan: vcInfo,
StartTs: startTs,
State: state,
TimeoutTs: timeoutTs,
}
// Only set timer for watchInfo not from bufferID
if op.NodeID != bufferID {
c.stateTimer.startOne(state, ch.Name, op.NodeID, timeoutTs)
channelsWithTimer = append(channelsWithTimer, ch.Name)
}
op.ChannelWatchInfos = append(op.ChannelWatchInfos, info)
}
return channelsWithTimer
}
// GetChannels gets channels info of registered nodes.
func (c *ChannelManager) GetChannels() []*NodeChannelInfo {
c.mu.RLock()
@ -438,3 +520,270 @@ func (c *ChannelManager) findChannel(channelName string) (int64, *channel) {
}
return 0, nil
}
type ackType = int
const (
invalidAck = iota
watchSuccessAck
watchFailAck
watchTimeoutAck
releaseSuccessAck
releaseFailAck
releaseTimeoutAck
)
type ackEvent struct {
ackType ackType
channelName string
nodeID UniqueID
}
func (c *ChannelManager) updateWithTimer(updates ChannelOpSet, state datapb.ChannelWatchState) error {
var channelsWithTimer = []string{}
for _, op := range updates {
if op.Type == Add {
channelsWithTimer = append(channelsWithTimer, c.fillChannelWatchInfoWithState(op, state)...)
}
}
err := c.store.Update(updates)
if err != nil {
log.Warn("fail to update", zap.Array("updates", updates))
c.stateTimer.removeTimers(channelsWithTimer)
}
return err
}
func (c *ChannelManager) processAck(e *ackEvent) {
c.stateTimer.stopIfExsit(e)
switch e.ackType {
case invalidAck:
log.Warn("detected invalid Ack", zap.String("channel name", e.channelName))
case watchSuccessAck:
log.Info("datanode successfully watched channel", zap.Int64("nodeID", e.nodeID), zap.String("channel name", e.channelName))
case watchFailAck, watchTimeoutAck: // failure acks from toWatch
err := c.Release(e.nodeID, e.channelName)
if err != nil {
log.Warn("fail to set channels to release for watch failure ACKs",
zap.Int64("nodeID", e.nodeID), zap.String("channel name", e.channelName))
}
case releaseFailAck, releaseTimeoutAck: // failure acks from toRelease
err := c.cleanUpAndDelete(e.nodeID, e.channelName)
if err != nil {
log.Warn("fail to clean and delete channels for release failure ACKs",
zap.Int64("nodeID", e.nodeID), zap.String("channel name", e.channelName))
}
case releaseSuccessAck:
err := c.toDelete(e.nodeID, e.channelName)
if err != nil {
log.Warn("fail to response to release success ACK",
zap.Int64("nodeID", e.nodeID), zap.String("channel name", e.channelName))
}
}
}
// cleanUpAndDelete tries to clean up datanode's subscription, and then delete channel watch info.
func (c *ChannelManager) cleanUpAndDelete(nodeID UniqueID, channelName string) error {
c.mu.Lock()
defer c.mu.Unlock()
chToCleanUp := c.getChannelByNodeAndName(nodeID, channelName)
if chToCleanUp == nil {
return fmt.Errorf("failed to find matching channel: %s and node: %d", channelName, nodeID)
}
if c.msgstreamFactory == nil {
log.Warn("msgstream factory is not set, unable to clean up topics")
} else {
subName := buildSubName(chToCleanUp.CollectionID, nodeID)
err := c.unsubscribe(subName, channelName)
if err != nil {
log.Warn("failed to unsubscribe topic", zap.String("subcription", subName), zap.String("channel name", channelName), zap.Error(err))
}
}
if !c.isMarkedDrop(channelName) {
reallocates := &NodeChannelInfo{nodeID, []*channel{chToCleanUp}}
// reassign policy won't choose the same Node for a ressignment of a channel
updates := c.reassignPolicy(c.store, []*NodeChannelInfo{reallocates})
if len(updates) <= 0 {
log.Warn("fail to reassign channel to other nodes, add channel to buffer", zap.String("channel name", channelName))
updates.Add(bufferID, []*channel{chToCleanUp})
}
err := c.remove(nodeID, chToCleanUp)
if err != nil {
return fmt.Errorf("failed to remove watch info: %v,%s", chToCleanUp, err.Error())
}
log.Info("channel manager reassign channels", zap.Int64("old node ID", nodeID), zap.Array("updates", updates))
return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch)
}
err := c.remove(nodeID, chToCleanUp)
if err != nil {
return fmt.Errorf("failed to remove watch info: %v,%s", chToCleanUp, err.Error())
}
log.Debug("try to cleanup removal flag ", zap.String("channel name", channelName))
c.h.FinishDropChannel(channelName)
return nil
}
type channelStateChecker func(context.Context)
func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context) {
defer logutil.LogPanic()
// REF MEP#7 watchInfo paths are orgnized as: [prefix]/channel/{node_id}/{channel_name}
watchPrefix := Params.DataCoordCfg.ChannelWatchSubPath
etcdWatcher, timeoutWatcher := c.stateTimer.getWatchers(watchPrefix)
for {
select {
case <-ctx.Done():
log.Info("watch etcd loop quit")
return
case ackEvent := <-timeoutWatcher:
log.Debug("receive timeout acks from state watcher",
zap.Int64("nodeID", ackEvent.nodeID), zap.String("channel name", ackEvent.channelName))
c.processAck(ackEvent)
case event := <-etcdWatcher:
if event.Canceled {
log.Warn("watch channel canceled", zap.Error(event.Err()))
// https://github.com/etcd-io/etcd/issues/8980
if event.Err() == v3rpc.ErrCompacted {
go c.watchChannelStatesLoop(ctx)
return
}
// if watch loop return due to event canceled, the datacoord is not functional anymore
log.Panic("datacoord is not functional for event canceled")
}
for _, evt := range event.Events {
if evt.Type == clientv3.EventTypeDelete {
continue
}
key := string(evt.Kv.Key)
watchInfo, err := parseWatchInfo(key, evt.Kv.Value)
if err != nil {
log.Warn("fail to parse watch info", zap.Error(err))
continue
}
// ignore these states
state := watchInfo.GetState()
if state == datapb.ChannelWatchState_ToWatch ||
state == datapb.ChannelWatchState_ToRelease ||
state == datapb.ChannelWatchState_Uncomplete {
continue
}
nodeID, err := parseNodeKey(key)
if err != nil {
log.Warn("fail to parse node from key", zap.String("key", key), zap.Error(err))
continue
}
ackEvent := parseAckEvent(nodeID, watchInfo)
c.processAck(ackEvent)
}
}
}
}
// Release writes ToRlease channel watch states for a channel
func (c *ChannelManager) Release(nodeID UniqueID, channelName string) error {
c.mu.Lock()
defer c.mu.Unlock()
toReleaseChannel := c.getChannelByNodeAndName(nodeID, channelName)
if toReleaseChannel == nil {
return fmt.Errorf("fail to find matching nodID: %d with channelName: %s", nodeID, channelName)
}
toReleaseUpdates := getReleaseOp(nodeID, toReleaseChannel)
err := c.updateWithTimer(toReleaseUpdates, datapb.ChannelWatchState_ToRelease)
if err != nil {
log.Debug("fail to update to release with timer", zap.Array("to release updates", toReleaseUpdates))
}
return err
}
// toDelete removes channel assignment from a datanode
func (c *ChannelManager) toDelete(nodeID UniqueID, channelName string) error {
c.mu.Lock()
defer c.mu.Unlock()
ch := c.getChannelByNodeAndName(nodeID, channelName)
if ch == nil {
return fmt.Errorf("fail to find matching nodID: %d with channelName: %s", nodeID, channelName)
}
if !c.isMarkedDrop(channelName) {
reallocates := &NodeChannelInfo{nodeID, []*channel{ch}}
// reassign policy won't choose the same Node for a ressignment of a channel
updates := c.reassignPolicy(c.store, []*NodeChannelInfo{reallocates})
if len(updates) <= 0 {
log.Warn("fail to reassign channel to other nodes, add to the buffer", zap.String("channel name", channelName))
updates.Add(bufferID, []*channel{ch})
}
err := c.remove(nodeID, ch)
if err != nil {
return fmt.Errorf("failed to remove watch info: %v,%s", ch, err.Error())
}
log.Info("channel manager reassign channels", zap.Int64("old node ID", nodeID), zap.Array("updates", updates))
return c.updateWithTimer(updates, datapb.ChannelWatchState_ToWatch)
}
err := c.remove(nodeID, ch)
if err != nil {
return err
}
log.Debug("try to cleanup removal flag ", zap.String("channel name", channelName))
c.h.FinishDropChannel(channelName)
log.Info("removed channel assignment", zap.Any("channel", ch))
return nil
}
func (c *ChannelManager) getChannelByNodeAndName(nodeID UniqueID, channelName string) *channel {
var ret *channel
nodeChannelInfo := c.store.GetNode(nodeID)
if nodeChannelInfo == nil {
return nil
}
for _, channel := range nodeChannelInfo.Channels {
if channel.Name == channelName {
ret = channel
break
}
}
return ret
}
func (c *ChannelManager) isMarkedDrop(channelName string) bool {
return c.h.CheckShouldDropChannel(channelName)
}
func getReleaseOp(nodeID UniqueID, ch *channel) ChannelOpSet {
var op ChannelOpSet
op.Add(nodeID, []*channel{ch})
return op
}

View File

@ -17,19 +17,709 @@
package datacoord
import (
"context"
"os"
"path"
"strconv"
"sync"
"testing"
"time"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"stathat.com/c/consistent"
)
func TestReload(t *testing.T) {
func checkWatchInfoWithState(t *testing.T, kv kv.MetaKv, state datapb.ChannelWatchState, nodeID UniqueID, channelName string, collectionID UniqueID) {
prefix := Params.DataCoordCfg.ChannelWatchSubPath
info, err := kv.Load(path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName))
assert.NoError(t, err)
assert.NotNil(t, info)
watchInfo, err := parseWatchInfo("fakeKey", []byte(info))
assert.NoError(t, err)
assert.Equal(t, watchInfo.GetState(), state)
assert.Equal(t, watchInfo.Vchan.GetChannelName(), channelName)
assert.Equal(t, watchInfo.Vchan.GetCollectionID(), collectionID)
}
func getOpsWithWatchInfo(nodeID UniqueID, ch *channel) ChannelOpSet {
var ops ChannelOpSet
ops.Add(nodeID, []*channel{ch})
for _, op := range ops {
op.ChannelWatchInfos = []*datapb.ChannelWatchInfo{{}}
}
return ops
}
func TestChannelManager_StateTransfer(t *testing.T) {
metakv := getMetaKv(t)
defer func() {
metakv.RemoveWithPrefix("")
metakv.Close()
}()
p := "/tmp/milvus_ut/rdb_data"
os.Setenv("ROCKSMQ_PATH", p)
prefix := Params.DataCoordCfg.ChannelWatchSubPath
var (
collectionID = UniqueID(9)
nodeID = UniqueID(119)
channel1 = "channel1"
)
getWatchInfoWithState := func(state datapb.ChannelWatchState) *datapb.ChannelWatchInfo {
return &datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{
CollectionID: collectionID,
ChannelName: channel1,
},
State: state,
}
}
t.Run("toWatch-WatchSuccess", func(t *testing.T) {
metakv.RemoveWithPrefix("")
ctx, cancel := context.WithCancel(context.TODO())
chManager, err := NewChannelManager(metakv, newMockHandler())
require.NoError(t, err)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
chManager.watchChannelStatesLoop(ctx)
wg.Done()
}()
chManager.AddNode(nodeID)
chManager.Watch(&channel{channel1, collectionID})
data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_WatchSuccess))
require.NoError(t, err)
err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1), string(data))
require.NoError(t, err)
// TODO: cancel could arrive earlier than etcd action watch channel
// if etcd has poor response latency.
time.Sleep(time.Second)
cancel()
wg.Wait()
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_WatchSuccess, nodeID, channel1, collectionID)
})
t.Run("ToWatch-WatchFail-ToRelease", func(t *testing.T) {
metakv.RemoveWithPrefix("")
ctx, cancel := context.WithCancel(context.TODO())
chManager, err := NewChannelManager(metakv, newMockHandler())
require.NoError(t, err)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
chManager.watchChannelStatesLoop(ctx)
wg.Done()
}()
chManager.AddNode(nodeID)
chManager.Watch(&channel{channel1, collectionID})
data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_WatchFailure))
require.NoError(t, err)
err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1), string(data))
require.NoError(t, err)
// TODO: cancel could arrive earlier than etcd action watch channel
// if etcd has poor response latency.
time.Sleep(time.Second)
cancel()
wg.Wait()
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channel1, collectionID)
})
t.Run("ToWatch-Timeout", func(t *testing.T) {
metakv.RemoveWithPrefix("")
ctx, cancel := context.WithCancel(context.TODO())
chManager, err := NewChannelManager(metakv, newMockHandler())
require.NoError(t, err)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
chManager.watchChannelStatesLoop(ctx)
wg.Done()
}()
chManager.AddNode(nodeID)
chManager.Watch(&channel{channel1, collectionID})
// simulating timeout behavior of startOne, cuz 20s is a long wait
e := &ackEvent{
ackType: watchTimeoutAck,
channelName: channel1,
nodeID: nodeID,
}
chManager.stateTimer.notifyTimeoutWatcher(e)
chManager.stateTimer.stopIfExsit(e)
// TODO: cancel could arrive earlier than etcd action watch channel
// if etcd has poor response latency.
time.Sleep(time.Second)
cancel()
wg.Wait()
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channel1, collectionID)
})
t.Run("toRelease-ReleaseSuccess-Delete-reassign-ToWatch", func(t *testing.T) {
var oldNode = UniqueID(120)
metakv.RemoveWithPrefix("")
ctx, cancel := context.WithCancel(context.TODO())
chManager, err := NewChannelManager(metakv, newMockHandler())
require.NoError(t, err)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
chManager.watchChannelStatesLoop(ctx)
wg.Done()
}()
chManager.store = &ChannelStore{
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{
{channel1, collectionID},
}},
},
}
err = chManager.Release(nodeID, channel1)
assert.NoError(t, err)
chManager.AddNode(oldNode)
data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_ReleaseSuccess))
require.NoError(t, err)
err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1), string(data))
require.NoError(t, err)
// TODO: cancel could arrive earlier than etcd action watch channel
// if etcd has poor response latency.
time.Sleep(time.Second)
cancel()
wg.Wait()
w, err := metakv.Load(path.Join(prefix, strconv.FormatInt(nodeID, 10)))
assert.Error(t, err)
assert.Empty(t, w)
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, oldNode, channel1, collectionID)
})
t.Run("toRelease-ReleaseFail-CleanUpAndDelete-Reassign-ToWatch", func(t *testing.T) {
var oldNode = UniqueID(121)
metakv.RemoveWithPrefix("")
ctx, cancel := context.WithCancel(context.TODO())
factory := msgstream.NewRmsFactory()
_, err := factory.NewMsgStream(context.TODO())
require.NoError(t, err)
chManager, err := NewChannelManager(metakv, newMockHandler(), withMsgstreamFactory(factory))
require.NoError(t, err)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
chManager.watchChannelStatesLoop(ctx)
wg.Done()
}()
chManager.store = &ChannelStore{
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{
{channel1, collectionID},
}},
},
}
err = chManager.Release(nodeID, channel1)
assert.NoError(t, err)
chManager.AddNode(oldNode)
data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_ReleaseFailure))
require.NoError(t, err)
err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channel1), string(data))
require.NoError(t, err)
// TODO: cancel could arrive earlier than etcd action watch channel
// if etcd has poor response latency.
time.Sleep(time.Second)
cancel()
wg.Wait()
w, err := metakv.Load(path.Join(prefix, strconv.FormatInt(nodeID, 10)))
assert.Error(t, err)
assert.Empty(t, w)
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, oldNode, channel1, collectionID)
})
}
func TestChannelManager(t *testing.T) {
metakv := getMetaKv(t)
defer func() {
metakv.RemoveWithPrefix("")
metakv.Close()
}()
prefix := Params.DataCoordCfg.ChannelWatchSubPath
t.Run("test AddNode", func(t *testing.T) {
// Note: this test is based on the default registerPolicy
defer metakv.RemoveWithPrefix("")
var (
collectionID = UniqueID(8)
nodeID, nodeToAdd = UniqueID(118), UniqueID(811)
channel1, channel2 = "channel1", "channel2"
)
chManager, err := NewChannelManager(metakv, newMockHandler())
require.NoError(t, err)
chManager.store = &ChannelStore{
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{
{channel1, collectionID},
{channel2, collectionID},
}},
},
}
err = chManager.AddNode(nodeToAdd)
assert.NoError(t, err)
chInfo := chManager.store.GetNode(nodeID)
assert.Equal(t, 2, len(chInfo.Channels))
chInfo = chManager.store.GetNode(nodeToAdd)
assert.Equal(t, 0, len(chInfo.Channels))
err = chManager.Watch(&channel{"channel-3", collectionID})
assert.NoError(t, err)
chInfo = chManager.store.GetNode(nodeToAdd)
assert.Equal(t, 1, len(chInfo.Channels))
chManager.stateTimer.removeTimers([]string{"channel-3"})
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeToAdd, "channel-3", collectionID)
})
t.Run("test Watch", func(t *testing.T) {
defer metakv.RemoveWithPrefix("")
var (
collectionID = UniqueID(7)
nodeID = UniqueID(117)
bufferCh = "bufferID"
chanToAdd = "new-channel-watch"
)
chManager, err := NewChannelManager(metakv, newMockHandler())
require.NoError(t, err)
err = chManager.Watch(&channel{bufferCh, collectionID})
assert.NoError(t, err)
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, bufferID, bufferCh, collectionID)
chManager.store.Add(nodeID)
err = chManager.Watch(&channel{chanToAdd, collectionID})
assert.NoError(t, err)
chManager.stateTimer.removeTimers([]string{chanToAdd})
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, chanToAdd, collectionID)
})
t.Run("test Release", func(t *testing.T) {
defer metakv.RemoveWithPrefix("")
var (
collectionID = UniqueID(4)
nodeID, invalidNodeID = UniqueID(114), UniqueID(999)
channelName, invalidChName = "to-release", "invalid-to-release"
)
chManager, err := NewChannelManager(metakv, newMockHandler())
require.NoError(t, err)
chManager.store = &ChannelStore{
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{{channelName, collectionID}}},
},
}
err = chManager.Release(invalidNodeID, invalidChName)
assert.Error(t, err)
err = chManager.Release(nodeID, channelName)
assert.NoError(t, err)
chManager.stateTimer.stopIfExsit(&ackEvent{releaseSuccessAck, channelName, nodeID})
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channelName, collectionID)
})
t.Run("test toDelete", func(t *testing.T) {
defer metakv.RemoveWithPrefix("")
var collectionID = UniqueID(5)
tests := []struct {
isvalid bool
nodeID UniqueID
chName string
}{
{true, UniqueID(125), "normal-chan"},
{true, UniqueID(115), "to-delete-chan"},
{false, UniqueID(9), "invalid-chan"},
}
chManager, err := NewChannelManager(metakv, newMockHandler())
require.NoError(t, err)
// prepare tests
for _, test := range tests {
if test.isvalid {
chManager.store.Add(test.nodeID)
ops := getOpsWithWatchInfo(test.nodeID, &channel{test.chName, collectionID})
err = chManager.store.Update(ops)
require.NoError(t, err)
info, err := metakv.Load(path.Join(prefix, strconv.FormatInt(test.nodeID, 10), test.chName))
require.NoError(t, err)
require.NotNil(t, info)
}
}
remainTest, reassignTest := tests[0], tests[1]
err = chManager.toDelete(reassignTest.nodeID, reassignTest.chName)
assert.NoError(t, err)
chManager.stateTimer.stopIfExsit(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID})
// test no nodes are removed from store
nodesID := chManager.store.GetNodes()
assert.Equal(t, 2, len(nodesID))
// test nodes of reassignTest contains no channel
nodeChanInfo := chManager.store.GetNode(reassignTest.nodeID)
assert.Equal(t, 0, len(nodeChanInfo.Channels))
// test all channels are assgined to node of remainTest
nodeChanInfo = chManager.store.GetNode(remainTest.nodeID)
assert.Equal(t, 2, len(nodeChanInfo.Channels))
assert.ElementsMatch(t, []*channel{{remainTest.chName, collectionID}, {reassignTest.chName, collectionID}}, nodeChanInfo.Channels)
// Delete node of reassginTest and try to toDelete node in remainTest
err = chManager.DeleteNode(reassignTest.nodeID)
require.NoError(t, err)
err = chManager.toDelete(remainTest.nodeID, remainTest.chName)
assert.NoError(t, err)
chManager.stateTimer.stopIfExsit(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID})
// channel is added to bufferID because there's only one node left
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, bufferID, remainTest.chName, collectionID)
})
t.Run("test cleanUpAndDelete", func(t *testing.T) {
defer metakv.RemoveWithPrefix("")
var collectionID = UniqueID(6)
tests := []struct {
isvalid bool
nodeID UniqueID
chName string
}{
{true, UniqueID(126), "normal-chan"},
{true, UniqueID(116), "to-delete-chan"},
{false, UniqueID(9), "invalid-chan"},
}
factory := msgstream.NewRmsFactory()
_, err := factory.NewMsgStream(context.TODO())
require.NoError(t, err)
chManager, err := NewChannelManager(metakv, newMockHandler(), withMsgstreamFactory(factory))
require.NoError(t, err)
// prepare tests
for _, test := range tests {
if test.isvalid {
chManager.store.Add(test.nodeID)
ops := getOpsWithWatchInfo(test.nodeID, &channel{test.chName, collectionID})
err = chManager.store.Update(ops)
require.NoError(t, err)
info, err := metakv.Load(path.Join(prefix, strconv.FormatInt(test.nodeID, 10), test.chName))
require.NoError(t, err)
require.NotNil(t, info)
}
}
remainTest, reassignTest := tests[0], tests[1]
err = chManager.cleanUpAndDelete(reassignTest.nodeID, reassignTest.chName)
assert.NoError(t, err)
chManager.stateTimer.stopIfExsit(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID})
// test no nodes are removed from store
nodesID := chManager.store.GetNodes()
assert.Equal(t, 2, len(nodesID))
// test nodes of reassignTest contains no channel
nodeChanInfo := chManager.store.GetNode(reassignTest.nodeID)
assert.Equal(t, 0, len(nodeChanInfo.Channels))
// test all channels are assgined to node of remainTest
nodeChanInfo = chManager.store.GetNode(remainTest.nodeID)
assert.Equal(t, 2, len(nodeChanInfo.Channels))
assert.ElementsMatch(t, []*channel{{remainTest.chName, collectionID}, {reassignTest.chName, collectionID}}, nodeChanInfo.Channels)
// Delete node of reassginTest and try to cleanUpAndDelete node in remainTest
err = chManager.DeleteNode(reassignTest.nodeID)
require.NoError(t, err)
err = chManager.cleanUpAndDelete(remainTest.nodeID, remainTest.chName)
assert.NoError(t, err)
chManager.stateTimer.stopIfExsit(&ackEvent{releaseSuccessAck, reassignTest.chName, reassignTest.nodeID})
// channel is added to bufferID because there's only one node left
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, bufferID, remainTest.chName, collectionID)
})
t.Run("test getChannelByNodeAndName", func(t *testing.T) {
defer metakv.RemoveWithPrefix("")
var (
nodeID = UniqueID(113)
collectionID = UniqueID(3)
channelName = "get-channel-by-node-and-name"
)
chManager, err := NewChannelManager(metakv, newMockHandler())
require.NoError(t, err)
ch := chManager.getChannelByNodeAndName(nodeID, channelName)
assert.Nil(t, ch)
chManager.store.Add(nodeID)
ch = chManager.getChannelByNodeAndName(nodeID, channelName)
assert.Nil(t, ch)
chManager.store = &ChannelStore{
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{{channelName, collectionID}}},
},
}
ch = chManager.getChannelByNodeAndName(nodeID, channelName)
assert.NotNil(t, ch)
assert.Equal(t, collectionID, ch.CollectionID)
assert.Equal(t, channelName, ch.Name)
})
t.Run("test fillChannelWatchInfoWithState", func(t *testing.T) {
defer metakv.RemoveWithPrefix("")
var (
nodeID = UniqueID(111)
collectionID = UniqueID(1)
channelName = "fill-channel-watchInfo-with-state"
)
chManager, err := NewChannelManager(metakv, newMockHandler())
require.NoError(t, err)
tests := []struct {
inState datapb.ChannelWatchState
description string
}{
{datapb.ChannelWatchState_ToWatch, "fill toWatch state"},
{datapb.ChannelWatchState_ToRelease, "fill toRelase state"},
}
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
ops := getReleaseOp(nodeID, &channel{channelName, collectionID})
for _, op := range ops {
chs := chManager.fillChannelWatchInfoWithState(op, test.inState)
assert.Equal(t, 1, len(chs))
assert.Equal(t, channelName, chs[0])
assert.Equal(t, 1, len(op.ChannelWatchInfos))
assert.Equal(t, test.inState, op.ChannelWatchInfos[0].GetState())
chManager.stateTimer.removeTimers(chs)
}
})
}
})
t.Run("test updateWithTimer", func(t *testing.T) {
var (
nodeID = UniqueID(112)
collectionID = UniqueID(2)
channelName = "update-with-timer"
)
chManager, err := NewChannelManager(metakv, newMockHandler())
require.NoError(t, err)
chManager.store.Add(nodeID)
opSet := getReleaseOp(nodeID, &channel{channelName, collectionID})
chManager.updateWithTimer(opSet, datapb.ChannelWatchState_ToWatch)
chManager.stateTimer.stopIfExsit(&ackEvent{watchSuccessAck, channelName, nodeID})
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, nodeID, channelName, collectionID)
})
}
func TestChannelManager_Reload(t *testing.T) {
metakv := getMetaKv(t)
defer func() {
metakv.RemoveWithPrefix("")
metakv.Close()
}()
var (
nodeID = UniqueID(200)
collectionID = UniqueID(2)
channelName = "channel-checkOldNodes"
)
prefix := Params.DataCoordCfg.ChannelWatchSubPath
getWatchInfoWithState := func(state datapb.ChannelWatchState) *datapb.ChannelWatchInfo {
return &datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{
CollectionID: collectionID,
ChannelName: channelName,
},
State: state,
TimeoutTs: time.Now().Add(20 * time.Second).UnixNano(),
}
}
t.Run("test checkOldNodes", func(t *testing.T) {
metakv.RemoveWithPrefix("")
t.Run("ToWatch", func(t *testing.T) {
defer metakv.RemoveWithPrefix("")
data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_ToWatch))
require.NoError(t, err)
chManager, err := NewChannelManager(metakv, newMockHandler())
require.NoError(t, err)
err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName), string(data))
require.NoError(t, err)
chManager.checkOldNodes([]UniqueID{nodeID})
_, ok := chManager.stateTimer.runningTimers.Load(channelName)
assert.True(t, ok)
chManager.stateTimer.removeTimers([]string{channelName})
})
t.Run("ToRelease", func(t *testing.T) {
defer metakv.RemoveWithPrefix("")
data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_ToRelease))
require.NoError(t, err)
chManager, err := NewChannelManager(metakv, newMockHandler())
require.NoError(t, err)
err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName), string(data))
require.NoError(t, err)
err = chManager.checkOldNodes([]UniqueID{nodeID})
assert.NoError(t, err)
_, ok := chManager.stateTimer.runningTimers.Load(channelName)
assert.True(t, ok)
chManager.stateTimer.removeTimers([]string{channelName})
})
t.Run("WatchFail", func(t *testing.T) {
defer metakv.RemoveWithPrefix("")
chManager, err := NewChannelManager(metakv, newMockHandler())
require.NoError(t, err)
chManager.store = &ChannelStore{
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{{channelName, collectionID}}}},
}
data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_WatchFailure))
require.NoError(t, err)
err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName), string(data))
require.NoError(t, err)
err = chManager.checkOldNodes([]UniqueID{nodeID})
assert.NoError(t, err)
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToRelease, nodeID, channelName, collectionID)
chManager.stateTimer.removeTimers([]string{channelName})
})
t.Run("ReleaseSuccess", func(t *testing.T) {
defer metakv.RemoveWithPrefix("")
chManager, err := NewChannelManager(metakv, newMockHandler())
require.NoError(t, err)
data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_ReleaseSuccess))
chManager.store = &ChannelStore{
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{{channelName, collectionID}}}},
}
chManager.AddNode(bufferID)
require.NoError(t, err)
err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName), string(data))
require.NoError(t, err)
err = chManager.checkOldNodes([]UniqueID{nodeID})
assert.NoError(t, err)
v, err := metakv.Load(path.Join(prefix, strconv.FormatInt(nodeID, 10)))
assert.Error(t, err)
assert.Empty(t, v)
})
t.Run("ReleaseFail", func(t *testing.T) {
defer metakv.RemoveWithPrefix("")
chManager, err := NewChannelManager(metakv, newMockHandler())
require.NoError(t, err)
data, err := proto.Marshal(getWatchInfoWithState(datapb.ChannelWatchState_ReleaseSuccess))
chManager.store = &ChannelStore{
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
nodeID: {nodeID, []*channel{{channelName, collectionID}}},
999: {999, []*channel{}},
},
}
require.NoError(t, err)
err = metakv.Save(path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName), string(data))
require.NoError(t, err)
err = chManager.checkOldNodes([]UniqueID{nodeID, 999})
assert.NoError(t, err)
time.Sleep(time.Second)
v, err := metakv.Load(path.Join(prefix, strconv.FormatInt(nodeID, 10)))
assert.Error(t, err)
assert.Empty(t, v)
checkWatchInfoWithState(t, metakv, datapb.ChannelWatchState_ToWatch, 999, channelName, collectionID)
})
})
t.Run("test reload with data", func(t *testing.T) {
Params.Init()
kv := memkv.NewMemoryKV()
defer metakv.RemoveWithPrefix("")
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
hash := consistent.New()
cm, err := NewChannelManager(kv, newMockHandler(), withFactory(NewConsistentHashChannelPolicyFactory(hash)))
cm, err := NewChannelManager(metakv, newMockHandler(), withFactory(NewConsistentHashChannelPolicyFactory(hash)))
assert.Nil(t, err)
assert.Nil(t, cm.AddNode(1))
assert.Nil(t, cm.AddNode(2))
@ -37,9 +727,9 @@ func TestReload(t *testing.T) {
assert.Nil(t, cm.Watch(&channel{"channel2", 1}))
hash2 := consistent.New()
cm2, err := NewChannelManager(kv, newMockHandler(), withFactory(NewConsistentHashChannelPolicyFactory(hash2)))
cm2, err := NewChannelManager(metakv, newMockHandler(), withFactory(NewConsistentHashChannelPolicyFactory(hash2)))
assert.Nil(t, err)
assert.Nil(t, cm2.Startup([]int64{1, 2}))
assert.Nil(t, cm2.Startup(ctx, []int64{1, 2}))
assert.Nil(t, cm2.AddNode(3))
assert.True(t, cm2.Match(3, "channel1"))
assert.True(t, cm2.Match(3, "channel2"))
@ -47,6 +737,12 @@ func TestReload(t *testing.T) {
}
func TestChannelManager_RemoveChannel(t *testing.T) {
metakv := getMetaKv(t)
defer func() {
metakv.RemoveWithPrefix("")
metakv.Close()
}()
type fields struct {
store RWChannelStore
}
@ -63,7 +759,7 @@ func TestChannelManager_RemoveChannel(t *testing.T) {
"test remove existed channel",
fields{
store: &ChannelStore{
store: memkv.NewMemoryKV(),
store: metakv,
channelsInfo: map[int64]*NodeChannelInfo{
1: {
NodeID: 1,

View File

@ -44,7 +44,7 @@ func NewCluster(sessionManager *SessionManager, channelManager *ChannelManager)
}
// Startup inits the cluster with the given data nodes.
func (c *Cluster) Startup(nodes []*NodeInfo) error {
func (c *Cluster) Startup(ctx context.Context, nodes []*NodeInfo) error {
for _, node := range nodes {
c.sessionManager.AddSession(node)
}
@ -52,7 +52,7 @@ func (c *Cluster) Startup(nodes []*NodeInfo) error {
for _, node := range nodes {
currs = append(currs, node.NodeID)
}
return c.channelManager.Startup(currs)
return c.channelManager.Startup(ctx, currs)
}
// Register registers a new node in cluster

View File

@ -23,16 +23,35 @@ import (
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/kv"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"stathat.com/c/consistent"
)
func getMetaKv(t *testing.T) kv.MetaKv {
Params.Init()
rootPath := "/etcd/test/root/" + t.Name()
metakv, err := etcdkv.NewMetaKvFactory(rootPath, &Params.EtcdCfg)
require.NoError(t, err)
return metakv
}
func TestClusterCreate(t *testing.T) {
kv := getMetaKv(t)
defer func() {
kv.RemoveWithPrefix("")
kv.Close()
}()
t.Run("startup normally", func(t *testing.T) {
kv := memkv.NewMemoryKV()
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
sessionManager := NewSessionManager()
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.Nil(t, err)
@ -44,7 +63,7 @@ func TestClusterCreate(t *testing.T) {
Address: addr,
}
nodes := []*NodeInfo{info}
err = cluster.Startup(nodes)
err = cluster.Startup(ctx, nodes)
assert.Nil(t, err)
dataNodes := sessionManager.GetSessions()
assert.EqualValues(t, 1, len(dataNodes))
@ -52,9 +71,12 @@ func TestClusterCreate(t *testing.T) {
})
t.Run("startup with existed channel data", func(t *testing.T) {
Params.Init()
defer kv.RemoveWithPrefix("")
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
var err error
kv := memkv.NewMemoryKV()
info1 := &datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{
CollectionID: 1,
@ -72,7 +94,7 @@ func TestClusterCreate(t *testing.T) {
cluster := NewCluster(sessionManager, channelManager)
defer cluster.Close()
err = cluster.Startup([]*NodeInfo{{NodeID: 1, Address: "localhost:9999"}})
err = cluster.Startup(ctx, []*NodeInfo{{NodeID: 1, Address: "localhost:9999"}})
assert.Nil(t, err)
channels := channelManager.GetChannels()
@ -80,7 +102,11 @@ func TestClusterCreate(t *testing.T) {
})
t.Run("remove all nodes and restart with other nodes", func(t *testing.T) {
kv := memkv.NewMemoryKV()
defer kv.RemoveWithPrefix("")
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
sessionManager := NewSessionManager()
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.Nil(t, err)
@ -92,7 +118,7 @@ func TestClusterCreate(t *testing.T) {
Address: addr,
}
nodes := []*NodeInfo{info}
err = cluster.Startup(nodes)
err = cluster.Startup(ctx, nodes)
assert.Nil(t, err)
err = cluster.UnRegister(info)
@ -114,7 +140,7 @@ func TestClusterCreate(t *testing.T) {
Address: addr,
}
nodes = []*NodeInfo{info}
err = clusterReload.Startup(nodes)
err = clusterReload.Startup(ctx, nodes)
assert.Nil(t, err)
sessions = sessionManager2.GetSessions()
assert.EqualValues(t, 1, len(sessions))
@ -126,8 +152,9 @@ func TestClusterCreate(t *testing.T) {
})
t.Run("loadKv Fails", func(t *testing.T) {
kv := memkv.NewMemoryKV()
fkv := &loadPrefixFailKV{TxnKV: kv}
defer kv.RemoveWithPrefix("")
fkv := &loadPrefixFailKV{MetaKv: kv}
_, err := NewChannelManager(fkv, newMockHandler())
assert.NotNil(t, err)
})
@ -135,7 +162,7 @@ func TestClusterCreate(t *testing.T) {
// a mock kv that always fail when LoadWithPrefix
type loadPrefixFailKV struct {
kv.TxnKV
kv.MetaKv
}
// LoadWithPrefix override behavior
@ -144,15 +171,25 @@ func (kv *loadPrefixFailKV) LoadWithPrefix(key string) ([]string, []string, erro
}
func TestRegister(t *testing.T) {
kv := getMetaKv(t)
defer func() {
kv.RemoveWithPrefix("")
kv.Close()
}()
t.Run("register to empty cluster", func(t *testing.T) {
kv := memkv.NewMemoryKV()
defer kv.RemoveWithPrefix("")
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
sessionManager := NewSessionManager()
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.Nil(t, err)
cluster := NewCluster(sessionManager, channelManager)
defer cluster.Close()
addr := "localhost:8080"
err = cluster.Startup(nil)
err = cluster.Startup(ctx, nil)
assert.Nil(t, err)
info := &NodeInfo{
NodeID: 1,
@ -166,7 +203,11 @@ func TestRegister(t *testing.T) {
})
t.Run("register to empty cluster with buffer channels", func(t *testing.T) {
kv := memkv.NewMemoryKV()
defer kv.RemoveWithPrefix("")
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
sessionManager := NewSessionManager()
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.Nil(t, err)
@ -178,7 +219,7 @@ func TestRegister(t *testing.T) {
cluster := NewCluster(sessionManager, channelManager)
defer cluster.Close()
addr := "localhost:8080"
err = cluster.Startup(nil)
err = cluster.Startup(ctx, nil)
assert.Nil(t, err)
info := &NodeInfo{
NodeID: 1,
@ -195,13 +236,17 @@ func TestRegister(t *testing.T) {
})
t.Run("register and restart with no channel", func(t *testing.T) {
kv := memkv.NewMemoryKV()
defer kv.RemoveWithPrefix("")
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
sessionManager := NewSessionManager()
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.Nil(t, err)
cluster := NewCluster(sessionManager, channelManager)
addr := "localhost:8080"
err = cluster.Startup(nil)
err = cluster.Startup(ctx, nil)
assert.Nil(t, err)
info := &NodeInfo{
NodeID: 1,
@ -222,8 +267,18 @@ func TestRegister(t *testing.T) {
}
func TestUnregister(t *testing.T) {
kv := getMetaKv(t)
defer func() {
kv.RemoveWithPrefix("")
kv.Close()
}()
t.Run("remove node after unregister", func(t *testing.T) {
kv := memkv.NewMemoryKV()
defer kv.RemoveWithPrefix("")
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
sessionManager := NewSessionManager()
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.Nil(t, err)
@ -235,7 +290,7 @@ func TestUnregister(t *testing.T) {
NodeID: 1,
}
nodes := []*NodeInfo{info}
err = cluster.Startup(nodes)
err = cluster.Startup(ctx, nodes)
assert.Nil(t, err)
err = cluster.UnRegister(nodes[0])
assert.Nil(t, err)
@ -244,7 +299,11 @@ func TestUnregister(t *testing.T) {
})
t.Run("move channels to online nodes after unregister", func(t *testing.T) {
kv := memkv.NewMemoryKV()
defer kv.RemoveWithPrefix("")
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
sessionManager := NewSessionManager()
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.Nil(t, err)
@ -260,7 +319,7 @@ func TestUnregister(t *testing.T) {
NodeID: 2,
}
nodes := []*NodeInfo{nodeInfo1, nodeInfo2}
err = cluster.Startup(nodes)
err = cluster.Startup(ctx, nodes)
assert.Nil(t, err)
err = cluster.Watch("ch1", 1)
assert.Nil(t, err)
@ -275,10 +334,14 @@ func TestUnregister(t *testing.T) {
})
t.Run("remove all channels after unregsiter", func(t *testing.T) {
defer kv.RemoveWithPrefix("")
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
var mockSessionCreator = func(ctx context.Context, addr string) (types.DataNode, error) {
return newMockDataNodeClient(1, nil)
}
kv := memkv.NewMemoryKV()
sessionManager := NewSessionManager(withSessionCreator(mockSessionCreator))
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.Nil(t, err)
@ -289,7 +352,7 @@ func TestUnregister(t *testing.T) {
Address: "localhost:8080",
NodeID: 1,
}
err = cluster.Startup([]*NodeInfo{nodeInfo})
err = cluster.Startup(ctx, []*NodeInfo{nodeInfo})
assert.Nil(t, err)
err = cluster.Watch("ch_1", 1)
assert.Nil(t, err)
@ -305,11 +368,21 @@ func TestUnregister(t *testing.T) {
}
func TestWatchIfNeeded(t *testing.T) {
kv := getMetaKv(t)
defer func() {
kv.RemoveWithPrefix("")
kv.Close()
}()
t.Run("add deplicated channel to cluster", func(t *testing.T) {
defer kv.RemoveWithPrefix("")
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
var mockSessionCreator = func(ctx context.Context, addr string) (types.DataNode, error) {
return newMockDataNodeClient(1, nil)
}
kv := memkv.NewMemoryKV()
sessionManager := NewSessionManager(withSessionCreator(mockSessionCreator))
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.Nil(t, err)
@ -322,7 +395,7 @@ func TestWatchIfNeeded(t *testing.T) {
NodeID: 1,
}
err = cluster.Startup([]*NodeInfo{info})
err = cluster.Startup(ctx, []*NodeInfo{info})
assert.Nil(t, err)
err = cluster.Watch("ch1", 1)
assert.Nil(t, err)
@ -332,7 +405,8 @@ func TestWatchIfNeeded(t *testing.T) {
})
t.Run("watch channel to empty cluster", func(t *testing.T) {
kv := memkv.NewMemoryKV()
defer kv.RemoveWithPrefix("")
sessionManager := NewSessionManager()
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.Nil(t, err)
@ -351,7 +425,12 @@ func TestWatchIfNeeded(t *testing.T) {
}
func TestConsistentHashPolicy(t *testing.T) {
kv := memkv.NewMemoryKV()
kv := getMetaKv(t)
defer func() {
kv.RemoveWithPrefix("")
kv.Close()
}()
sessionManager := NewSessionManager()
chash := consistent.New()
factory := NewConsistentHashChannelPolicyFactory(chash)
@ -428,7 +507,14 @@ func TestConsistentHashPolicy(t *testing.T) {
}
func TestCluster_Flush(t *testing.T) {
kv := memkv.NewMemoryKV()
kv := getMetaKv(t)
defer func() {
kv.RemoveWithPrefix("")
kv.Close()
}()
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
sessionManager := NewSessionManager()
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.Nil(t, err)
@ -440,7 +526,7 @@ func TestCluster_Flush(t *testing.T) {
NodeID: 1,
}
nodes := []*NodeInfo{info}
err = cluster.Startup(nodes)
err = cluster.Startup(ctx, nodes)
assert.Nil(t, err)
err = cluster.Watch("chan-1", 1)

View File

@ -308,7 +308,7 @@ func (s *Server) initCluster() error {
}
var err error
s.channelManager, err = NewChannelManager(s.kvClient, s.handler, withMsgstreamFactory(s.msFactory))
s.channelManager, err = NewChannelManager(s.kvClient, s.handler, withMsgstreamFactory(s.msFactory), withStateChecker())
if err != nil {
return err
}
@ -405,7 +405,7 @@ func (s *Server) initServiceDiscovery() error {
datanodes = append(datanodes, info)
}
s.cluster.Startup(datanodes)
s.cluster.Startup(s.ctx, datanodes)
// TODO implement rewatch logic
s.eventCh = s.session.WatchServices(typeutil.DataNodeRole, rev+1, nil)

View File

@ -31,7 +31,6 @@ import (
"time"
"github.com/milvus-io/milvus/internal/common"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
@ -1967,6 +1966,12 @@ func TestGetCompactionStateWithPlans(t *testing.T) {
}
func TestOptions(t *testing.T) {
kv := getMetaKv(t)
defer func() {
kv.RemoveWithPrefix("")
kv.Close()
}()
t.Run("SetRootCoordCreator", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
@ -1983,7 +1988,8 @@ func TestOptions(t *testing.T) {
assert.NotNil(t, svr.rootCoordClientCreator)
})
t.Run("SetCluster", func(t *testing.T) {
kv := memkv.NewMemoryKV()
defer kv.RemoveWithPrefix("")
sessionManager := NewSessionManager()
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.Nil(t, err)
@ -2031,14 +2037,21 @@ func (p *mockPolicyFactory) NewDeregisterPolicy() DeregisterPolicy {
}
func TestHandleSessionEvent(t *testing.T) {
kv := memkv.NewMemoryKV()
kv := getMetaKv(t)
defer func() {
kv.RemoveWithPrefix("")
kv.Close()
}()
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
channelManager, err := NewChannelManager(kv, newMockHandler(), withFactory(&mockPolicyFactory{}))
assert.Nil(t, err)
sessionManager := NewSessionManager()
cluster := NewCluster(sessionManager, channelManager)
assert.Nil(t, err)
err = cluster.Startup(nil)
err = cluster.Startup(ctx, nil)
assert.Nil(t, err)
defer cluster.Close()
@ -2285,9 +2298,9 @@ func (ms *MockClosePanicMsgstream) Consume() *msgstream.MsgPack {
}
func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Server {
var err error
Params.Init()
Params.MsgChannelCfg.DataCoordTimeTick = Params.MsgChannelCfg.DataCoordTimeTick + strconv.Itoa(rand.Int())
var err error
factory := msgstream.NewPmsFactory()
m := map[string]interface{}{
"pulsarAddress": Params.PulsarCfg.Address,
@ -2311,6 +2324,7 @@ func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Se
svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoord, error) {
return newMockRootCoordService(), nil
}
assert.Nil(t, err)
err = svr.Init()
assert.Nil(t, err)
@ -2318,6 +2332,12 @@ func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Se
assert.Nil(t, err)
err = svr.Register()
assert.Nil(t, err)
// Stop channal watch state watcher in tests
if svr.channelManager != nil && svr.channelManager.stopChecker != nil {
svr.channelManager.stopChecker()
}
return svr
}

View File

@ -434,15 +434,12 @@ func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtual
}
log.Info("DropVChannel plan to remove", zap.String("channel", channel))
err = s.channelManager.RemoveChannel(channel)
err = s.channelManager.Release(nodeID, channel)
if err != nil {
log.Warn("DropVChannel failed to RemoveChannel", zap.String("channel", channel), zap.Error(err))
log.Warn("DropVChannel failed to ReleaseAndRemove", zap.String("channel", channel), zap.Error(err))
}
s.segmentManager.DropSegmentsOfChannel(ctx, channel)
// clean up removal flag
s.meta.FinishRemoveChannel(channel)
// no compaction triggerred in Drop procedure
resp.Status.ErrorCode = commonpb.ErrorCode_Success
return resp, nil

View File

@ -307,9 +307,8 @@ func (node *DataNode) handleChannelEvt(evt *clientv3.Event) {
func (node *DataNode) handleWatchInfo(e *event, key string, data []byte) {
switch e.eventType {
case putEventType:
log.Info("DataNode is handling watchInfo put event", zap.String("key", key))
case putEventType:
watchInfo, err := parsePutEventData(data)
if err != nil {
log.Warn("fail to handle watchInfo", zap.Int("event type", e.eventType), zap.String("key", key), zap.Error(err))
@ -323,6 +322,7 @@ func (node *DataNode) handleWatchInfo(e *event, key string, data []byte) {
e.info = watchInfo
e.vChanName = watchInfo.GetVchan().GetChannelName()
log.Info("DataNode is handling watchInfo put event", zap.String("key", key), zap.String("state", watchInfo.GetState().String()))
case deleteEventType:
log.Info("DataNode is handling watchInfo delete event", zap.String("key", key))
@ -357,7 +357,6 @@ func parsePutEventData(data []byte) (*datapb.ChannelWatchInfo, error) {
if watchInfo.Vchan == nil {
return nil, fmt.Errorf("invalid event: ChannelWatchInfo with nil VChannelInfo")
}
return &watchInfo, nil
}
@ -369,6 +368,7 @@ func parseDeleteEventKey(key string) string {
func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo, version int64) (err error) {
vChanName := watchInfo.GetVchan().GetChannelName()
log.Info("handle put event", zap.String("watch state", watchInfo.State.String()), zap.String("vChanName", vChanName))
switch watchInfo.State {
case datapb.ChannelWatchState_Uncomplete, datapb.ChannelWatchState_ToWatch: