Add ctx parameter and log tracer for watch and selectNodes (#27809)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/27816/head
congqixia 2023-10-20 04:22:11 +08:00 committed by GitHub
parent b6e07d6fe3
commit 49516d44b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 52 additions and 48 deletions

View File

@ -431,7 +431,8 @@ func (c *ChannelManager) unsubAttempt(ncInfo *NodeChannelInfo) {
}
// Watch tries to add the channel to cluster. Watch is a no op if the channel already exists.
func (c *ChannelManager) Watch(ch *channel) error {
func (c *ChannelManager) Watch(ctx context.Context, ch *channel) error {
log := log.Ctx(ctx)
c.mu.Lock()
defer c.mu.Unlock()

View File

@ -120,7 +120,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
}()
chManager.AddNode(nodeID)
chManager.Watch(&channel{Name: cName, CollectionID: collectionID})
chManager.Watch(ctx, &channel{Name: cName, CollectionID: collectionID})
key := path.Join(prefix, strconv.FormatInt(nodeID, 10), cName)
waitAndStore(t, watchkv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess)
@ -150,7 +150,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
}()
chManager.AddNode(nodeID)
chManager.Watch(&channel{Name: cName, CollectionID: collectionID})
chManager.Watch(ctx, &channel{Name: cName, CollectionID: collectionID})
key := path.Join(prefix, strconv.FormatInt(nodeID, 10), cName)
waitAndStore(t, watchkv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchFailure)
@ -181,7 +181,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
}()
chManager.AddNode(nodeID)
chManager.Watch(&channel{Name: cName, CollectionID: collectionID})
chManager.Watch(ctx, &channel{Name: cName, CollectionID: collectionID})
// simulating timeout behavior of startOne, cuz 20s is a long wait
e := &ackEvent{
@ -417,7 +417,7 @@ func TestChannelManager(t *testing.T) {
assert.False(t, chManager.Match(nodeToAdd, channel1))
assert.False(t, chManager.Match(nodeToAdd, channel2))
err = chManager.Watch(&channel{Name: "channel-3", CollectionID: collectionID})
err = chManager.Watch(context.TODO(), &channel{Name: "channel-3", CollectionID: collectionID})
assert.NoError(t, err)
assert.True(t, chManager.Match(nodeToAdd, "channel-3"))
@ -459,7 +459,7 @@ func TestChannelManager(t *testing.T) {
assert.True(t, chManager.Match(nodeID, channel1))
assert.True(t, chManager.Match(nodeID, channel2))
err = chManager.Watch(&channel{Name: "channel-3", CollectionID: collectionID})
err = chManager.Watch(context.TODO(), &channel{Name: "channel-3", CollectionID: collectionID})
assert.NoError(t, err)
waitAndCheckState(t, watchkv, datapb.ChannelWatchState_ToWatch, nodeID, "channel-3", collectionID)
@ -478,13 +478,13 @@ func TestChannelManager(t *testing.T) {
chManager, err := NewChannelManager(watchkv, newMockHandler())
require.NoError(t, err)
err = chManager.Watch(&channel{Name: bufferCh, CollectionID: collectionID})
err = chManager.Watch(context.TODO(), &channel{Name: bufferCh, CollectionID: collectionID})
assert.NoError(t, err)
waitAndCheckState(t, watchkv, datapb.ChannelWatchState_ToWatch, bufferID, bufferCh, collectionID)
chManager.store.Add(nodeID)
err = chManager.Watch(&channel{Name: chanToAdd, CollectionID: collectionID})
err = chManager.Watch(context.TODO(), &channel{Name: chanToAdd, CollectionID: collectionID})
assert.NoError(t, err)
waitAndCheckState(t, watchkv, datapb.ChannelWatchState_ToWatch, nodeID, chanToAdd, collectionID)
@ -758,7 +758,7 @@ func TestChannelManager(t *testing.T) {
assert.False(t, chManager.stateTimer.hasRunningTimers())
// 3. watch one channel
chManager.Watch(&channel{Name: cName, CollectionID: collectionID})
chManager.Watch(ctx, &channel{Name: cName, CollectionID: collectionID})
assert.False(t, chManager.isSilent())
assert.True(t, chManager.stateTimer.hasRunningTimers())
key := path.Join(prefix, strconv.FormatInt(nodeID, 10), cName)
@ -1016,7 +1016,7 @@ func TestChannelManager_BalanceBehaviour(t *testing.T) {
assert.True(t, chManager.Match(2, "channel-1"))
chManager.AddNode(3)
chManager.Watch(&channel{Name: "channel-4", CollectionID: collectionID})
chManager.Watch(ctx, &channel{Name: "channel-4", CollectionID: collectionID})
key = path.Join(prefix, "3", "channel-4")
waitAndStore(t, watchkv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess)

View File

@ -71,8 +71,8 @@ func (c *Cluster) UnRegister(node *NodeInfo) error {
}
// Watch tries to add a channel in datanode cluster
func (c *Cluster) Watch(ch string, collectionID UniqueID) error {
return c.channelManager.Watch(&channel{Name: ch, CollectionID: collectionID})
func (c *Cluster) Watch(ctx context.Context, ch string, collectionID UniqueID) error {
return c.channelManager.Watch(ctx, &channel{Name: ch, CollectionID: collectionID})
}
// Flush sends flush requests to dataNodes specified

View File

@ -235,7 +235,7 @@ func (suite *ClusterSuite) TestRegister() {
sessionManager := NewSessionManager()
channelManager, err := NewChannelManager(kv, newMockHandler())
suite.NoError(err)
err = channelManager.Watch(&channel{
err = channelManager.Watch(context.TODO(), &channel{
Name: "ch1",
CollectionID: 0,
})
@ -347,7 +347,7 @@ func (suite *ClusterSuite) TestUnregister() {
nodes := []*NodeInfo{nodeInfo1, nodeInfo2}
err = cluster.Startup(ctx, nodes)
suite.NoError(err)
err = cluster.Watch("ch1", 1)
err = cluster.Watch(ctx, "ch1", 1)
suite.NoError(err)
err = cluster.UnRegister(nodeInfo1)
suite.NoError(err)
@ -382,7 +382,7 @@ func (suite *ClusterSuite) TestUnregister() {
}
err = cluster.Startup(ctx, []*NodeInfo{nodeInfo})
suite.NoError(err)
err = cluster.Watch("ch_1", 1)
err = cluster.Watch(ctx, "ch_1", 1)
suite.NoError(err)
err = cluster.UnRegister(nodeInfo)
suite.NoError(err)
@ -431,7 +431,7 @@ func TestWatchIfNeeded(t *testing.T) {
err = cluster.Startup(ctx, []*NodeInfo{info})
assert.NoError(t, err)
err = cluster.Watch("ch1", 1)
err = cluster.Watch(ctx, "ch1", 1)
assert.NoError(t, err)
channels := channelManager.GetChannels()
assert.EqualValues(t, 1, len(channels))
@ -441,13 +441,15 @@ func TestWatchIfNeeded(t *testing.T) {
t.Run("watch channel to empty cluster", func(t *testing.T) {
defer kv.RemoveWithPrefix("")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sessionManager := NewSessionManager()
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.NoError(t, err)
cluster := NewCluster(sessionManager, channelManager)
defer cluster.Close()
err = cluster.Watch("ch1", 1)
err = cluster.Watch(ctx, "ch1", 1)
assert.NoError(t, err)
channels := channelManager.GetChannels()
@ -499,7 +501,7 @@ func TestConsistentHashPolicy(t *testing.T) {
channels := []string{"ch1", "ch2", "ch3"}
for _, c := range channels {
err = cluster.Watch(c, 1)
err = cluster.Watch(context.TODO(), c, 1)
assert.NoError(t, err)
idstr, err := hash.Get(c)
assert.NoError(t, err)
@ -563,7 +565,7 @@ func TestCluster_Flush(t *testing.T) {
err = cluster.Startup(ctx, nodes)
assert.NoError(t, err)
err = cluster.Watch("chan-1", 1)
err = cluster.Watch(context.Background(), "chan-1", 1)
assert.NoError(t, err)
// flush empty should impact nothing
@ -610,7 +612,7 @@ func TestCluster_Import(t *testing.T) {
err = cluster.Startup(ctx, nodes)
assert.NoError(t, err)
err = cluster.Watch("chan-1", 1)
err = cluster.Watch(ctx, "chan-1", 1)
assert.NoError(t, err)
assert.NotPanics(t, func() {

View File

@ -356,7 +356,7 @@ func TestFlush(t *testing.T) {
err := svr.channelManager.AddNode(1)
assert.NoError(t, err)
err = svr.channelManager.Watch(&channel{Name: "ch1", CollectionID: 0})
err = svr.channelManager.Watch(context.TODO(), &channel{Name: "ch1", CollectionID: 0})
assert.NoError(t, err)
resp, err := svr.Flush(context.TODO(), req)
@ -1306,12 +1306,13 @@ func TestSaveBinlogPaths(t *testing.T) {
assert.NoError(t, err)
}
ctx := context.Background()
err := svr.channelManager.AddNode(0)
assert.NoError(t, err)
err = svr.channelManager.Watch(&channel{Name: "ch1", CollectionID: 0})
err = svr.channelManager.Watch(ctx, &channel{Name: "ch1", CollectionID: 0})
assert.NoError(t, err)
ctx := context.Background()
resp, err := svr.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
Base: &commonpb.MsgBase{
Timestamp: uint64(time.Now().Unix()),
@ -1393,12 +1394,12 @@ func TestSaveBinlogPaths(t *testing.T) {
assert.NoError(t, err)
}
ctx := context.Background()
err := svr.channelManager.AddNode(0)
assert.NoError(t, err)
err = svr.channelManager.Watch(&channel{Name: "ch1", CollectionID: 0})
err = svr.channelManager.Watch(ctx, &channel{Name: "ch1", CollectionID: 0})
assert.NoError(t, err)
ctx := context.Background()
resp, err := svr.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
Base: &commonpb.MsgBase{
Timestamp: uint64(time.Now().Unix()),
@ -1471,12 +1472,12 @@ func TestSaveBinlogPaths(t *testing.T) {
assert.NoError(t, err)
}
ctx := context.Background()
err := svr.channelManager.AddNode(0)
assert.NoError(t, err)
err = svr.channelManager.Watch(&channel{Name: "ch1", CollectionID: 0})
err = svr.channelManager.Watch(ctx, &channel{Name: "ch1", CollectionID: 0})
assert.NoError(t, err)
ctx := context.Background()
resp, err := svr.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
Base: &commonpb.MsgBase{
Timestamp: uint64(time.Now().Unix()),
@ -1525,12 +1526,12 @@ func TestSaveBinlogPaths(t *testing.T) {
ID: 0,
})
ctx := context.Background()
err := svr.channelManager.AddNode(0)
assert.NoError(t, err)
err = svr.channelManager.Watch(&channel{Name: "ch1", CollectionID: 0})
err = svr.channelManager.Watch(ctx, &channel{Name: "ch1", CollectionID: 0})
assert.NoError(t, err)
ctx := context.Background()
resp, err := svr.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
Base: &commonpb.MsgBase{
Timestamp: uint64(time.Now().Unix()),
@ -1575,7 +1576,7 @@ func TestSaveBinlogPaths(t *testing.T) {
defer closeTestServer(t, svr)
err := svr.channelManager.AddNode(0)
require.Nil(t, err)
err = svr.channelManager.Watch(&channel{Name: "ch1", CollectionID: 0})
err = svr.channelManager.Watch(context.TODO(), &channel{Name: "ch1", CollectionID: 0})
require.Nil(t, err)
s := &datapb.SegmentInfo{
ID: 1,
@ -1690,12 +1691,12 @@ func TestDropVirtualChannel(t *testing.T) {
svr.meta.AddSegment(context.TODO(), NewSegmentInfo(os))
ctx := context.Background()
err := svr.channelManager.AddNode(0)
require.Nil(t, err)
err = svr.channelManager.Watch(&channel{Name: "ch1", CollectionID: 0})
err = svr.channelManager.Watch(ctx, &channel{Name: "ch1", CollectionID: 0})
require.Nil(t, err)
ctx := context.Background()
req := &datapb.DropVirtualChannelRequest{
Base: &commonpb.MsgBase{
Timestamp: uint64(time.Now().Unix()),
@ -1765,7 +1766,7 @@ func TestDropVirtualChannel(t *testing.T) {
<-spyCh
err = svr.channelManager.Watch(&channel{Name: "ch1", CollectionID: 0})
err = svr.channelManager.Watch(ctx, &channel{Name: "ch1", CollectionID: 0})
require.Nil(t, err)
// resend
@ -1779,7 +1780,7 @@ func TestDropVirtualChannel(t *testing.T) {
defer closeTestServer(t, svr)
err := svr.channelManager.AddNode(0)
require.Nil(t, err)
err = svr.channelManager.Watch(&channel{Name: "ch1", CollectionID: 0})
err = svr.channelManager.Watch(context.TODO(), &channel{Name: "ch1", CollectionID: 0})
require.Nil(t, err)
resp, err := svr.DropVirtualChannel(context.Background(), &datapb.DropVirtualChannelRequest{
@ -2844,7 +2845,7 @@ func TestGetRecoveryInfo(t *testing.T) {
err = svr.channelManager.AddNode(0)
assert.NoError(t, err)
err = svr.channelManager.Watch(&channel{Name: "vchan1", CollectionID: 0})
err = svr.channelManager.Watch(context.TODO(), &channel{Name: "vchan1", CollectionID: 0})
assert.NoError(t, err)
sResp, err := svr.SaveBinlogPaths(context.TODO(), binlogReq)
@ -3926,7 +3927,7 @@ func TestDataCoord_Import(t *testing.T) {
})
err := svr.channelManager.AddNode(0)
assert.NoError(t, err)
err = svr.channelManager.Watch(&channel{Name: "ch1", CollectionID: 0})
err = svr.channelManager.Watch(svr.ctx, &channel{Name: "ch1", CollectionID: 0})
assert.NoError(t, err)
resp, err := svr.Import(svr.ctx, &datapb.ImportTaskRequest{
@ -3945,7 +3946,7 @@ func TestDataCoord_Import(t *testing.T) {
err := svr.channelManager.AddNode(0)
assert.NoError(t, err)
err = svr.channelManager.Watch(&channel{Name: "ch1", CollectionID: 0})
err = svr.channelManager.Watch(svr.ctx, &channel{Name: "ch1", CollectionID: 0})
assert.NoError(t, err)
resp, err := svr.Import(svr.ctx, &datapb.ImportTaskRequest{
@ -4089,7 +4090,7 @@ func TestDataCoord_SaveImportSegment(t *testing.T) {
})
err := svr.channelManager.AddNode(110)
assert.NoError(t, err)
err = svr.channelManager.Watch(&channel{Name: "ch1", CollectionID: 100})
err = svr.channelManager.Watch(context.TODO(), &channel{Name: "ch1", CollectionID: 100})
assert.NoError(t, err)
status, err := svr.SaveImportSegment(context.TODO(), &datapb.SaveImportSegmentRequest{
@ -4126,7 +4127,7 @@ func TestDataCoord_SaveImportSegment(t *testing.T) {
err := svr.channelManager.AddNode(110)
assert.NoError(t, err)
err = svr.channelManager.Watch(&channel{Name: "ch1", CollectionID: 100})
err = svr.channelManager.Watch(context.TODO(), &channel{Name: "ch1", CollectionID: 100})
assert.NoError(t, err)
status, err := svr.SaveImportSegment(context.TODO(), &datapb.SaveImportSegmentRequest{

View File

@ -193,7 +193,7 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
}
// Add the channel to cluster for watching.
s.cluster.Watch(r.ChannelName, r.CollectionID)
s.cluster.Watch(ctx, r.ChannelName, r.CollectionID)
segmentAllocations := make([]*Allocation, 0)
if r.GetIsImport() {
@ -1175,7 +1175,7 @@ func (s *Server) WatchChannels(ctx context.Context, req *datapb.WatchChannelsReq
Schema: req.GetSchema(),
CreateTimestamp: req.GetCreateTimestamp(),
}
err := s.channelManager.Watch(ch)
err := s.channelManager.Watch(ctx, ch)
if err != nil {
log.Warn("fail to watch channelName", zap.Error(err))
resp.Status = merr.Status(err)

View File

@ -232,7 +232,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
ch := &channel{Name: "vchan1", CollectionID: 0}
svr.channelManager.AddNode(0)
svr.channelManager.Watch(ch)
svr.channelManager.Watch(context.Background(), ch)
req := &datapb.GetRecoveryInfoRequestV2{
CollectionID: 0,
@ -312,7 +312,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
ch := &channel{Name: "vchan1", CollectionID: 0}
svr.channelManager.AddNode(0)
svr.channelManager.Watch(ch)
svr.channelManager.Watch(context.Background(), ch)
req := &datapb.GetRecoveryInfoRequestV2{
CollectionID: 0,
@ -405,7 +405,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
err = svr.channelManager.AddNode(0)
assert.NoError(t, err)
err = svr.channelManager.Watch(&channel{Name: "vchan1", CollectionID: 0})
err = svr.channelManager.Watch(context.Background(), &channel{Name: "vchan1", CollectionID: 0})
assert.NoError(t, err)
sResp, err := svr.SaveBinlogPaths(context.TODO(), binlogReq)
@ -456,7 +456,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
ch := &channel{Name: "vchan1", CollectionID: 0}
svr.channelManager.AddNode(0)
svr.channelManager.Watch(ch)
svr.channelManager.Watch(context.Background(), ch)
req := &datapb.GetRecoveryInfoRequestV2{
CollectionID: 0,
@ -502,7 +502,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
ch := &channel{Name: "vchan1", CollectionID: 0}
svr.channelManager.AddNode(0)
svr.channelManager.Watch(ch)
svr.channelManager.Watch(context.Background(), ch)
req := &datapb.GetRecoveryInfoRequestV2{
CollectionID: 0,
@ -586,7 +586,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
ch := &channel{Name: "vchan1", CollectionID: 0}
svr.channelManager.AddNode(0)
svr.channelManager.Watch(ch)
svr.channelManager.Watch(context.Background(), ch)
req := &datapb.GetRecoveryInfoRequestV2{
CollectionID: 0,

View File

@ -91,7 +91,7 @@ func (lb *LBPolicyImpl) Start(ctx context.Context) {
// try to select the best node from the available nodes
func (lb *LBPolicyImpl) selectNode(ctx context.Context, workload ChannelWorkload, excludeNodes typeutil.UniqueSet) (int64, error) {
log := log.With(
log := log.Ctx(ctx).With(
zap.Int64("collectionID", workload.collectionID),
zap.String("collectionName", workload.collectionName),
zap.String("channelName", workload.channel),