From 175a656ff1e9c2b0a9a00dee173b1018e1b53bf2 Mon Sep 17 00:00:00 2001 From: Bingyi Sun Date: Wed, 17 Nov 2021 23:25:12 +0800 Subject: [PATCH] Unwatch dropped channel when init channel manager (#11986) issue: #11558 Signed-off-by: sunby Co-authored-by: sunby --- internal/datacoord/channel_manager.go | 38 +++++- internal/datacoord/channel_manager_test.go | 4 +- internal/datacoord/cluster_test.go | 32 ++--- internal/datacoord/datanode_helper.go | 38 ------ internal/datacoord/handler.go | 134 +++++++++++++++++++++ internal/datacoord/mock_test.go | 18 +++ internal/datacoord/server.go | 103 +--------------- internal/datacoord/server_test.go | 12 +- internal/datacoord/services.go | 23 +--- 9 files changed, 214 insertions(+), 188 deletions(-) delete mode 100644 internal/datacoord/datanode_helper.go create mode 100644 internal/datacoord/handler.go diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 17915aa520..97b1c3097a 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -36,7 +36,7 @@ const ( // ChannelManager manages the allocation and the balance of channels between datanodes type ChannelManager struct { mu sync.RWMutex - posProvider positionProvider + h Handler store RWChannelStore factory ChannelPolicyFactory registerPolicy RegisterPolicy @@ -62,11 +62,15 @@ func defaultFactory(hash *consistent.Consistent) ChannelPolicyFactory { } // NewChannelManager returns a new ChannelManager -func NewChannelManager(kv kv.TxnKV, posProvider positionProvider, options ...ChannelManagerOpt) (*ChannelManager, error) { +func NewChannelManager( + kv kv.TxnKV, + h Handler, + options ...ChannelManagerOpt, +) (*ChannelManager, error) { c := &ChannelManager{ - posProvider: posProvider, - factory: NewChannelPolicyFactoryV1(kv), - store: NewChannelStore(kv), + h: h, + factory: NewChannelPolicyFactoryV1(kv), + store: NewChannelStore(kv), } if err := c.store.Reload(); err != nil { @@ -106,6 +110,9 @@ func (c *ChannelManager) Startup(nodes []int64) error { return err } } + + c.unwatchDroppedChannels() + log.Debug("cluster start up", zap.Any("nodes", nodes), zap.Any("olds", olds), @@ -114,6 +121,21 @@ func (c *ChannelManager) Startup(nodes []int64) error { return nil } +func (c *ChannelManager) unwatchDroppedChannels() { + nodeChannels := c.store.GetNodesChannels() + for _, nodeChannel := range nodeChannels { + for _, ch := range nodeChannel.Channels { + if !c.h.CheckShouldDropChannel(ch.Name) { + continue + } + err := c.remove(nodeChannel.NodeID, ch) + if err != nil { + log.Warn("unable to remove channel", zap.String("channel", ch.Name), zap.Error(err)) + } + } + } +} + func (c *ChannelManager) bgCheckChannelsWork(ctx context.Context) { timer := time.NewTicker(bgCheckInterval) for { @@ -242,7 +264,7 @@ func (c *ChannelManager) Watch(ch *channel) error { func (c *ChannelManager) fillChannelPosition(update *ChannelOp) { for _, ch := range update.Channels { - vchan := c.posProvider.GetVChanPositions(ch.Name, ch.CollectionID, allPartitionID) + vchan := c.h.GetVChanPositions(ch.Name, ch.CollectionID, allPartitionID) info := &datapb.ChannelWatchInfo{ Vchan: vchan, StartTs: time.Now().Unix(), @@ -320,6 +342,10 @@ func (c *ChannelManager) RemoveChannel(channelName string) error { return nil } + return c.remove(nodeID, ch) +} + +func (c *ChannelManager) remove(nodeID int64, ch *channel) error { var op ChannelOpSet op.Delete(nodeID, []*channel{ch}) if err := c.store.Update(op); err != nil { diff --git a/internal/datacoord/channel_manager_test.go b/internal/datacoord/channel_manager_test.go index e882d2e801..cd6942a9f9 100644 --- a/internal/datacoord/channel_manager_test.go +++ b/internal/datacoord/channel_manager_test.go @@ -29,7 +29,7 @@ func TestReload(t *testing.T) { Params.Init() kv := memkv.NewMemoryKV() hash := consistent.New() - cm, err := NewChannelManager(kv, &dummyPosProvider{}, withFactory(NewConsistentHashChannelPolicyFactory(hash))) + cm, err := NewChannelManager(kv, newMockHandler(), withFactory(NewConsistentHashChannelPolicyFactory(hash))) assert.Nil(t, err) assert.Nil(t, cm.AddNode(1)) assert.Nil(t, cm.AddNode(2)) @@ -37,7 +37,7 @@ func TestReload(t *testing.T) { assert.Nil(t, cm.Watch(&channel{"channel2", 1})) hash2 := consistent.New() - cm2, err := NewChannelManager(kv, &dummyPosProvider{}, withFactory(NewConsistentHashChannelPolicyFactory(hash2))) + cm2, err := NewChannelManager(kv, newMockHandler(), withFactory(NewConsistentHashChannelPolicyFactory(hash2))) assert.Nil(t, err) assert.Nil(t, cm2.Startup([]int64{1, 2})) assert.Nil(t, cm2.AddNode(3)) diff --git a/internal/datacoord/cluster_test.go b/internal/datacoord/cluster_test.go index 8ce71ede93..12871c0ec9 100644 --- a/internal/datacoord/cluster_test.go +++ b/internal/datacoord/cluster_test.go @@ -34,7 +34,7 @@ func TestClusterCreate(t *testing.T) { t.Run("startup normally", func(t *testing.T) { kv := memkv.NewMemoryKV() sessionManager := NewSessionManager() - channelManager, err := NewChannelManager(kv, dummyPosProvider{}) + channelManager, err := NewChannelManager(kv, newMockHandler()) assert.Nil(t, err) cluster := NewCluster(sessionManager, channelManager) defer cluster.Close() @@ -67,7 +67,7 @@ func TestClusterCreate(t *testing.T) { assert.Nil(t, err) sessionManager := NewSessionManager() - channelManager, err := NewChannelManager(kv, dummyPosProvider{}) + channelManager, err := NewChannelManager(kv, newMockHandler()) assert.Nil(t, err) cluster := NewCluster(sessionManager, channelManager) defer cluster.Close() @@ -82,7 +82,7 @@ func TestClusterCreate(t *testing.T) { t.Run("remove all nodes and restart with other nodes", func(t *testing.T) { kv := memkv.NewMemoryKV() sessionManager := NewSessionManager() - channelManager, err := NewChannelManager(kv, dummyPosProvider{}) + channelManager, err := NewChannelManager(kv, newMockHandler()) assert.Nil(t, err) cluster := NewCluster(sessionManager, channelManager) @@ -103,7 +103,7 @@ func TestClusterCreate(t *testing.T) { cluster.Close() sessionManager2 := NewSessionManager() - channelManager2, err := NewChannelManager(kv, dummyPosProvider{}) + channelManager2, err := NewChannelManager(kv, newMockHandler()) assert.Nil(t, err) clusterReload := NewCluster(sessionManager2, channelManager2) defer clusterReload.Close() @@ -128,7 +128,7 @@ func TestClusterCreate(t *testing.T) { t.Run("loadKv Fails", func(t *testing.T) { kv := memkv.NewMemoryKV() fkv := &loadPrefixFailKV{TxnKV: kv} - _, err := NewChannelManager(fkv, dummyPosProvider{}) + _, err := NewChannelManager(fkv, newMockHandler()) assert.NotNil(t, err) }) } @@ -147,7 +147,7 @@ func TestRegister(t *testing.T) { t.Run("register to empty cluster", func(t *testing.T) { kv := memkv.NewMemoryKV() sessionManager := NewSessionManager() - channelManager, err := NewChannelManager(kv, dummyPosProvider{}) + channelManager, err := NewChannelManager(kv, newMockHandler()) assert.Nil(t, err) cluster := NewCluster(sessionManager, channelManager) defer cluster.Close() @@ -168,7 +168,7 @@ func TestRegister(t *testing.T) { t.Run("register to empty cluster with buffer channels", func(t *testing.T) { kv := memkv.NewMemoryKV() sessionManager := NewSessionManager() - channelManager, err := NewChannelManager(kv, dummyPosProvider{}) + channelManager, err := NewChannelManager(kv, newMockHandler()) assert.Nil(t, err) err = channelManager.Watch(&channel{ Name: "ch1", @@ -197,7 +197,7 @@ func TestRegister(t *testing.T) { t.Run("register and restart with no channel", func(t *testing.T) { kv := memkv.NewMemoryKV() sessionManager := NewSessionManager() - channelManager, err := NewChannelManager(kv, dummyPosProvider{}) + channelManager, err := NewChannelManager(kv, newMockHandler()) assert.Nil(t, err) cluster := NewCluster(sessionManager, channelManager) addr := "localhost:8080" @@ -212,7 +212,7 @@ func TestRegister(t *testing.T) { cluster.Close() sessionManager2 := NewSessionManager() - channelManager2, err := NewChannelManager(kv, dummyPosProvider{}) + channelManager2, err := NewChannelManager(kv, newMockHandler()) assert.Nil(t, err) restartCluster := NewCluster(sessionManager2, channelManager2) defer restartCluster.Close() @@ -225,7 +225,7 @@ func TestUnregister(t *testing.T) { t.Run("remove node after unregister", func(t *testing.T) { kv := memkv.NewMemoryKV() sessionManager := NewSessionManager() - channelManager, err := NewChannelManager(kv, dummyPosProvider{}) + channelManager, err := NewChannelManager(kv, newMockHandler()) assert.Nil(t, err) cluster := NewCluster(sessionManager, channelManager) defer cluster.Close() @@ -246,7 +246,7 @@ func TestUnregister(t *testing.T) { t.Run("move channels to online nodes after unregister", func(t *testing.T) { kv := memkv.NewMemoryKV() sessionManager := NewSessionManager() - channelManager, err := NewChannelManager(kv, dummyPosProvider{}) + channelManager, err := NewChannelManager(kv, newMockHandler()) assert.Nil(t, err) cluster := NewCluster(sessionManager, channelManager) defer cluster.Close() @@ -280,7 +280,7 @@ func TestUnregister(t *testing.T) { } kv := memkv.NewMemoryKV() sessionManager := NewSessionManager(withSessionCreator(mockSessionCreator)) - channelManager, err := NewChannelManager(kv, dummyPosProvider{}) + channelManager, err := NewChannelManager(kv, newMockHandler()) assert.Nil(t, err) cluster := NewCluster(sessionManager, channelManager) defer cluster.Close() @@ -311,7 +311,7 @@ func TestWatchIfNeeded(t *testing.T) { } kv := memkv.NewMemoryKV() sessionManager := NewSessionManager(withSessionCreator(mockSessionCreator)) - channelManager, err := NewChannelManager(kv, dummyPosProvider{}) + channelManager, err := NewChannelManager(kv, newMockHandler()) assert.Nil(t, err) cluster := NewCluster(sessionManager, channelManager) defer cluster.Close() @@ -334,7 +334,7 @@ func TestWatchIfNeeded(t *testing.T) { t.Run("watch channel to empty cluster", func(t *testing.T) { kv := memkv.NewMemoryKV() sessionManager := NewSessionManager() - channelManager, err := NewChannelManager(kv, dummyPosProvider{}) + channelManager, err := NewChannelManager(kv, newMockHandler()) assert.Nil(t, err) cluster := NewCluster(sessionManager, channelManager) defer cluster.Close() @@ -355,7 +355,7 @@ func TestConsistentHashPolicy(t *testing.T) { sessionManager := NewSessionManager() chash := consistent.New() factory := NewConsistentHashChannelPolicyFactory(chash) - channelManager, err := NewChannelManager(kv, dummyPosProvider{}, withFactory(factory)) + channelManager, err := NewChannelManager(kv, newMockHandler(), withFactory(factory)) assert.Nil(t, err) cluster := NewCluster(sessionManager, channelManager) defer cluster.Close() @@ -430,7 +430,7 @@ func TestConsistentHashPolicy(t *testing.T) { func TestCluster_Flush(t *testing.T) { kv := memkv.NewMemoryKV() sessionManager := NewSessionManager() - channelManager, err := NewChannelManager(kv, dummyPosProvider{}) + channelManager, err := NewChannelManager(kv, newMockHandler()) assert.Nil(t, err) cluster := NewCluster(sessionManager, channelManager) defer cluster.Close() diff --git a/internal/datacoord/datanode_helper.go b/internal/datacoord/datanode_helper.go deleted file mode 100644 index 73c61266f8..0000000000 --- a/internal/datacoord/datanode_helper.go +++ /dev/null @@ -1,38 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package datacoord - -import ( - "github.com/milvus-io/milvus/internal/proto/datapb" -) - -// positionProvider provides vchannel pair related position pairs -type positionProvider interface { - GetVChanPositions(channel string, collectionID UniqueID, paritionID UniqueID) *datapb.VchannelInfo -} - -var _ positionProvider = (*dummyPosProvider)(nil) - -type dummyPosProvider struct{} - -//GetVChanPositions implements positionProvider -func (dp dummyPosProvider) GetVChanPositions(channel string, collectionID UniqueID, paritionID UniqueID) *datapb.VchannelInfo { - return &datapb.VchannelInfo{ - CollectionID: collectionID, - ChannelName: channel, - } -} diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go new file mode 100644 index 0000000000..d34f3aa2c1 --- /dev/null +++ b/internal/datacoord/handler.go @@ -0,0 +1,134 @@ +package datacoord + +import ( + "context" + + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/rootcoord" + "go.uber.org/zap" +) + +type Handler interface { + GetVChanPositions(channel string, collectionID UniqueID, partitionID UniqueID) *datapb.VchannelInfo + CheckShouldDropChannel(channel string) bool +} + +// Handler is a helper of Server +type ServerHandler struct { + s *Server +} + +func newServerHandler(s *Server) *ServerHandler { + return &ServerHandler{s: s} +} + +// GetVChanPositions get vchannel latest postitions with provided dml channel names +func (h *ServerHandler) GetVChanPositions(channel string, collectionID UniqueID, partitionID UniqueID) *datapb.VchannelInfo { + segments := h.s.meta.GetSegmentsByChannel(channel) + log.Debug("GetSegmentsByChannel", + zap.Any("collectionID", collectionID), + zap.Any("channel", channel), + zap.Any("numOfSegments", len(segments)), + ) + var flushed []*datapb.SegmentInfo + var unflushed []*datapb.SegmentInfo + var seekPosition *internalpb.MsgPosition + for _, s := range segments { + if (partitionID > allPartitionID && s.PartitionID != partitionID) || + (s.GetStartPosition() == nil && s.GetDmlPosition() == nil) { + continue + } + + if s.GetState() == commonpb.SegmentState_Flushing || s.GetState() == commonpb.SegmentState_Flushed { + flushed = append(flushed, trimSegmentInfo(s.SegmentInfo)) + } else { + unflushed = append(unflushed, s.SegmentInfo) + } + + var segmentPosition *internalpb.MsgPosition + if s.GetDmlPosition() != nil { + segmentPosition = s.GetDmlPosition() + } else { + segmentPosition = s.GetStartPosition() + } + + if seekPosition == nil || segmentPosition.Timestamp < seekPosition.Timestamp { + seekPosition = segmentPosition + } + } + // use collection start position when segment position is not found + if seekPosition == nil { + collection := h.GetCollection(h.s.ctx, collectionID) + if collection != nil { + seekPosition = getCollectionStartPosition(channel, collection) + } + } + + return &datapb.VchannelInfo{ + CollectionID: collectionID, + ChannelName: channel, + SeekPosition: seekPosition, + FlushedSegments: flushed, + UnflushedSegments: unflushed, + } +} + +func getCollectionStartPosition(channel string, collectionInfo *datapb.CollectionInfo) *internalpb.MsgPosition { + for _, sp := range collectionInfo.GetStartPositions() { + if sp.GetKey() != rootcoord.ToPhysicalChannel(channel) { + continue + } + return &internalpb.MsgPosition{ + ChannelName: channel, + MsgID: sp.GetData(), + } + } + return nil +} + +// trimSegmentInfo returns a shallow copy of datapb.SegmentInfo and sets ALL binlog info to nil +func trimSegmentInfo(info *datapb.SegmentInfo) *datapb.SegmentInfo { + return &datapb.SegmentInfo{ + ID: info.ID, + CollectionID: info.CollectionID, + PartitionID: info.PartitionID, + InsertChannel: info.InsertChannel, + NumOfRows: info.NumOfRows, + State: info.State, + MaxRowNum: info.MaxRowNum, + LastExpireTime: info.LastExpireTime, + StartPosition: info.StartPosition, + DmlPosition: info.DmlPosition, + } +} + +func (h *ServerHandler) GetCollection(ctx context.Context, collectionID UniqueID) *datapb.CollectionInfo { + coll := h.s.meta.GetCollection(collectionID) + if coll != nil { + return coll + } + err := h.s.loadCollectionFromRootCoord(ctx, collectionID) + if err != nil { + log.Warn("failed to load collection from rootcoord", zap.Int64("collectionID", collectionID), zap.Error(err)) + } + + return h.s.meta.GetCollection(collectionID) +} + +func (h *ServerHandler) CheckShouldDropChannel(channel string) bool { + segments := h.s.meta.GetSegmentsByChannel(channel) + for _, segment := range segments { + if segment.GetStartPosition() != nil && // fitler empty segment + // FIXME: we filter compaction generated segments + // because datanode may not know the segment due to the network lag or + // datacoord crash when handling CompleteCompaction. + len(segment.CompactionFrom) == 0 && + segment.GetState() != commonpb.SegmentState_Dropped { + return false + } + } + return true +} diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index 4acd2bb2d3..03ae902a55 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -584,3 +584,21 @@ func (t *mockCompactionTrigger) stop() { } panic("not implemented") } + +type mockHandler struct { +} + +func newMockHandler() *mockHandler { + return &mockHandler{} +} + +func (h *mockHandler) GetVChanPositions(channel string, collectionID UniqueID, partitionID UniqueID) *datapb.VchannelInfo { + return &datapb.VchannelInfo{ + CollectionID: collectionID, + ChannelName: channel, + } +} + +func (h *mockHandler) CheckShouldDropChannel(channel string) bool { + return false +} diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index ae16f33c55..57bff965f9 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -28,7 +28,6 @@ import ( datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client" rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client" "github.com/milvus-io/milvus/internal/logutil" - "github.com/milvus-io/milvus/internal/rootcoord" "github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/mqclient" "github.com/milvus-io/milvus/internal/util/tsoutil" @@ -46,7 +45,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" ) @@ -89,9 +87,6 @@ type rootCoordCreatorFunc func(ctx context.Context, metaRootPath string, etcdEnd // makes sure Server implements `DataCoord` var _ types.DataCoord = (*Server)(nil) -// makes sure Server implements `positionProvider` -var _ positionProvider = (*Server)(nil) - // Server implements `types.Datacoord` // handles Data Cooridinator related jobs type Server struct { @@ -112,6 +107,7 @@ type Server struct { rootCoordClient types.RootCoord garbageCollector *garbageCollector gcOpt GcOption + handler Handler compactionTrigger trigger compactionHandler compactionPlanContext @@ -248,6 +244,8 @@ func (s *Server) Start() error { return err } + s.handler = newServerHandler(s) + if err = s.initCluster(); err != nil { return err } @@ -282,7 +280,7 @@ func (s *Server) initCluster() error { } var err error - s.channelManager, err = NewChannelManager(s.kvClient, s) + s.channelManager, err = NewChannelManager(s.kvClient, s.handler) if err != nil { return err } @@ -783,96 +781,3 @@ func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID i s.meta.AddCollection(collInfo) return nil } - -// GetVChanPositions get vchannel latest postitions with provided dml channel names -func (s *Server) GetVChanPositions(channel string, collectionID UniqueID, partitionID UniqueID) *datapb.VchannelInfo { - segments := s.meta.GetSegmentsByChannel(channel) - log.Debug("GetSegmentsByChannel", - zap.Any("collectionID", collectionID), - zap.Any("channel", channel), - zap.Any("numOfSegments", len(segments)), - ) - var flushed []*datapb.SegmentInfo - var unflushed []*datapb.SegmentInfo - var seekPosition *internalpb.MsgPosition - for _, s := range segments { - if (partitionID > allPartitionID && s.PartitionID != partitionID) || - (s.GetStartPosition() == nil && s.GetDmlPosition() == nil) { - continue - } - - if s.GetState() == commonpb.SegmentState_Flushing || s.GetState() == commonpb.SegmentState_Flushed { - flushed = append(flushed, trimSegmentInfo(s.SegmentInfo)) - } else { - unflushed = append(unflushed, s.SegmentInfo) - } - - var segmentPosition *internalpb.MsgPosition - if s.GetDmlPosition() != nil { - segmentPosition = s.GetDmlPosition() - } else { - segmentPosition = s.GetStartPosition() - } - - if seekPosition == nil || segmentPosition.Timestamp < seekPosition.Timestamp { - seekPosition = segmentPosition - } - } - // use collection start position when segment position is not found - if seekPosition == nil { - collection := s.GetCollection(s.ctx, collectionID) - if collection != nil { - seekPosition = getCollectionStartPosition(channel, collection) - } - } - - return &datapb.VchannelInfo{ - CollectionID: collectionID, - ChannelName: channel, - SeekPosition: seekPosition, - FlushedSegments: flushed, - UnflushedSegments: unflushed, - } -} - -func getCollectionStartPosition(channel string, collectionInfo *datapb.CollectionInfo) *internalpb.MsgPosition { - for _, sp := range collectionInfo.GetStartPositions() { - if sp.GetKey() != rootcoord.ToPhysicalChannel(channel) { - continue - } - return &internalpb.MsgPosition{ - ChannelName: channel, - MsgID: sp.GetData(), - } - } - return nil -} - -// trimSegmentInfo returns a shallow copy of datapb.SegmentInfo and sets ALL binlog info to nil -func trimSegmentInfo(info *datapb.SegmentInfo) *datapb.SegmentInfo { - return &datapb.SegmentInfo{ - ID: info.ID, - CollectionID: info.CollectionID, - PartitionID: info.PartitionID, - InsertChannel: info.InsertChannel, - NumOfRows: info.NumOfRows, - State: info.State, - MaxRowNum: info.MaxRowNum, - LastExpireTime: info.LastExpireTime, - StartPosition: info.StartPosition, - DmlPosition: info.DmlPosition, - } -} - -func (s *Server) GetCollection(ctx context.Context, collectionID UniqueID) *datapb.CollectionInfo { - coll := s.meta.GetCollection(collectionID) - if coll != nil { - return coll - } - err := s.loadCollectionFromRootCoord(ctx, collectionID) - if err != nil { - log.Warn("failed to load collection from RootCoord", zap.Int64("collectionID", collectionID), zap.Error(err)) - } - - return s.meta.GetCollection(collectionID) -} diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index a49712d5b6..8591f3fcab 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -1182,13 +1182,13 @@ func TestGetVChannelPos(t *testing.T) { assert.Nil(t, err) t.Run("get unexisted channel", func(t *testing.T) { - vchan := svr.GetVChanPositions("chx1", 0, allPartitionID) + vchan := svr.handler.GetVChanPositions("chx1", 0, allPartitionID) assert.Empty(t, vchan.UnflushedSegments) assert.Empty(t, vchan.FlushedSegments) }) t.Run("get existed channel", func(t *testing.T) { - vchan := svr.GetVChanPositions("ch1", 0, allPartitionID) + vchan := svr.handler.GetVChanPositions("ch1", 0, allPartitionID) assert.EqualValues(t, 1, len(vchan.FlushedSegments)) assert.EqualValues(t, 1, vchan.FlushedSegments[0].ID) assert.EqualValues(t, 2, len(vchan.UnflushedSegments)) @@ -1196,7 +1196,7 @@ func TestGetVChannelPos(t *testing.T) { }) t.Run("empty collection", func(t *testing.T) { - infos := svr.GetVChanPositions("ch0_suffix", 1, allPartitionID) + infos := svr.handler.GetVChanPositions("ch0_suffix", 1, allPartitionID) assert.EqualValues(t, 1, infos.CollectionID) assert.EqualValues(t, 0, len(infos.FlushedSegments)) assert.EqualValues(t, 0, len(infos.UnflushedSegments)) @@ -1204,7 +1204,7 @@ func TestGetVChannelPos(t *testing.T) { }) t.Run("filter partition", func(t *testing.T) { - infos := svr.GetVChanPositions("ch1", 0, 1) + infos := svr.handler.GetVChanPositions("ch1", 0, 1) assert.EqualValues(t, 0, infos.CollectionID) assert.EqualValues(t, 0, len(infos.FlushedSegments)) assert.EqualValues(t, 1, len(infos.UnflushedSegments)) @@ -1622,7 +1622,7 @@ func TestOptions(t *testing.T) { t.Run("SetCluster", func(t *testing.T) { kv := memkv.NewMemoryKV() sessionManager := NewSessionManager() - channelManager, err := NewChannelManager(kv, dummyPosProvider{}) + channelManager, err := NewChannelManager(kv, newMockHandler()) assert.Nil(t, err) cluster := NewCluster(sessionManager, channelManager) @@ -1670,7 +1670,7 @@ func (p *mockPolicyFactory) NewDeregisterPolicy() DeregisterPolicy { func TestHandleSessionEvent(t *testing.T) { kv := memkv.NewMemoryKV() - channelManager, err := NewChannelManager(kv, dummyPosProvider{}, withFactory(&mockPolicyFactory{})) + channelManager, err := NewChannelManager(kv, newMockHandler(), withFactory(&mockPolicyFactory{})) assert.Nil(t, err) sessionManager := NewSessionManager() cluster := NewCluster(sessionManager, channelManager) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index bf7bab2d1b..b783e9ec4e 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -114,10 +114,6 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI zap.String("channelName", r.GetChannelName()), zap.Uint32("count", r.GetCount())) - if coll := s.GetCollection(ctx, r.CollectionID); coll == nil { - continue - } - s.cluster.Watch(r.ChannelName, r.CollectionID) allocations, err := s.segmentManager.AllocSegment(ctx, @@ -347,7 +343,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath log.Debug("flush segment with meta", zap.Int64("id", req.SegmentID), zap.Any("meta", req.GetField2BinlogPaths())) - if req.GetDropped() && s.checkShouldDropChannel(channel) { + if req.GetDropped() && s.handler.CheckShouldDropChannel(channel) { log.Debug("remove channel", zap.String("channel", channel)) err = s.channelManager.RemoveChannel(channel) if err != nil { @@ -378,21 +374,6 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath return resp, nil } -func (s *Server) checkShouldDropChannel(channel string) bool { - segments := s.meta.GetSegmentsByChannel(channel) - for _, segment := range segments { - if segment.GetStartPosition() != nil && // fitler empty segment - // FIXME: we filter compaction generated segments - // because datanode may not know the segment due to the network lag or - // datacoord crash when handling CompleteCompaction. - len(segment.CompactionFrom) == 0 && - segment.GetState() != commonpb.SegmentState_Dropped { - return false - } - } - return true -} - // GetComponentStates returns DataCoord's current state func (s *Server) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { resp := &internalpb.ComponentStates{ @@ -521,7 +502,7 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf channels := dresp.GetVirtualChannelNames() channelInfos := make([]*datapb.VchannelInfo, 0, len(channels)) for _, c := range channels { - channelInfo := s.GetVChanPositions(c, collectionID, partitionID) + channelInfo := s.handler.GetVChanPositions(c, collectionID, partitionID) channelInfos = append(channelInfos, channelInfo) log.Debug("datacoord append channelInfo in GetRecoveryInfo", zap.Any("collectionID", collectionID),