Unwatch dropped channel when init channel manager (#11986)

issue: #11558
Signed-off-by: sunby <bingyi.sun@zilliz.com>

Co-authored-by: sunby <bingyi.sun@zilliz.com>
pull/12043/head
Bingyi Sun 2021-11-17 23:25:12 +08:00 committed by GitHub
parent 240e8a3258
commit 175a656ff1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 214 additions and 188 deletions

View File

@ -36,7 +36,7 @@ const (
// ChannelManager manages the allocation and the balance of channels between datanodes
type ChannelManager struct {
mu sync.RWMutex
posProvider positionProvider
h Handler
store RWChannelStore
factory ChannelPolicyFactory
registerPolicy RegisterPolicy
@ -62,11 +62,15 @@ func defaultFactory(hash *consistent.Consistent) ChannelPolicyFactory {
}
// NewChannelManager returns a new ChannelManager
func NewChannelManager(kv kv.TxnKV, posProvider positionProvider, options ...ChannelManagerOpt) (*ChannelManager, error) {
func NewChannelManager(
kv kv.TxnKV,
h Handler,
options ...ChannelManagerOpt,
) (*ChannelManager, error) {
c := &ChannelManager{
posProvider: posProvider,
factory: NewChannelPolicyFactoryV1(kv),
store: NewChannelStore(kv),
h: h,
factory: NewChannelPolicyFactoryV1(kv),
store: NewChannelStore(kv),
}
if err := c.store.Reload(); err != nil {
@ -106,6 +110,9 @@ func (c *ChannelManager) Startup(nodes []int64) error {
return err
}
}
c.unwatchDroppedChannels()
log.Debug("cluster start up",
zap.Any("nodes", nodes),
zap.Any("olds", olds),
@ -114,6 +121,21 @@ func (c *ChannelManager) Startup(nodes []int64) error {
return nil
}
func (c *ChannelManager) unwatchDroppedChannels() {
nodeChannels := c.store.GetNodesChannels()
for _, nodeChannel := range nodeChannels {
for _, ch := range nodeChannel.Channels {
if !c.h.CheckShouldDropChannel(ch.Name) {
continue
}
err := c.remove(nodeChannel.NodeID, ch)
if err != nil {
log.Warn("unable to remove channel", zap.String("channel", ch.Name), zap.Error(err))
}
}
}
}
func (c *ChannelManager) bgCheckChannelsWork(ctx context.Context) {
timer := time.NewTicker(bgCheckInterval)
for {
@ -242,7 +264,7 @@ func (c *ChannelManager) Watch(ch *channel) error {
func (c *ChannelManager) fillChannelPosition(update *ChannelOp) {
for _, ch := range update.Channels {
vchan := c.posProvider.GetVChanPositions(ch.Name, ch.CollectionID, allPartitionID)
vchan := c.h.GetVChanPositions(ch.Name, ch.CollectionID, allPartitionID)
info := &datapb.ChannelWatchInfo{
Vchan: vchan,
StartTs: time.Now().Unix(),
@ -320,6 +342,10 @@ func (c *ChannelManager) RemoveChannel(channelName string) error {
return nil
}
return c.remove(nodeID, ch)
}
func (c *ChannelManager) remove(nodeID int64, ch *channel) error {
var op ChannelOpSet
op.Delete(nodeID, []*channel{ch})
if err := c.store.Update(op); err != nil {

View File

@ -29,7 +29,7 @@ func TestReload(t *testing.T) {
Params.Init()
kv := memkv.NewMemoryKV()
hash := consistent.New()
cm, err := NewChannelManager(kv, &dummyPosProvider{}, withFactory(NewConsistentHashChannelPolicyFactory(hash)))
cm, err := NewChannelManager(kv, newMockHandler(), withFactory(NewConsistentHashChannelPolicyFactory(hash)))
assert.Nil(t, err)
assert.Nil(t, cm.AddNode(1))
assert.Nil(t, cm.AddNode(2))
@ -37,7 +37,7 @@ func TestReload(t *testing.T) {
assert.Nil(t, cm.Watch(&channel{"channel2", 1}))
hash2 := consistent.New()
cm2, err := NewChannelManager(kv, &dummyPosProvider{}, withFactory(NewConsistentHashChannelPolicyFactory(hash2)))
cm2, err := NewChannelManager(kv, newMockHandler(), withFactory(NewConsistentHashChannelPolicyFactory(hash2)))
assert.Nil(t, err)
assert.Nil(t, cm2.Startup([]int64{1, 2}))
assert.Nil(t, cm2.AddNode(3))

View File

@ -34,7 +34,7 @@ func TestClusterCreate(t *testing.T) {
t.Run("startup normally", func(t *testing.T) {
kv := memkv.NewMemoryKV()
sessionManager := NewSessionManager()
channelManager, err := NewChannelManager(kv, dummyPosProvider{})
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.Nil(t, err)
cluster := NewCluster(sessionManager, channelManager)
defer cluster.Close()
@ -67,7 +67,7 @@ func TestClusterCreate(t *testing.T) {
assert.Nil(t, err)
sessionManager := NewSessionManager()
channelManager, err := NewChannelManager(kv, dummyPosProvider{})
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.Nil(t, err)
cluster := NewCluster(sessionManager, channelManager)
defer cluster.Close()
@ -82,7 +82,7 @@ func TestClusterCreate(t *testing.T) {
t.Run("remove all nodes and restart with other nodes", func(t *testing.T) {
kv := memkv.NewMemoryKV()
sessionManager := NewSessionManager()
channelManager, err := NewChannelManager(kv, dummyPosProvider{})
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.Nil(t, err)
cluster := NewCluster(sessionManager, channelManager)
@ -103,7 +103,7 @@ func TestClusterCreate(t *testing.T) {
cluster.Close()
sessionManager2 := NewSessionManager()
channelManager2, err := NewChannelManager(kv, dummyPosProvider{})
channelManager2, err := NewChannelManager(kv, newMockHandler())
assert.Nil(t, err)
clusterReload := NewCluster(sessionManager2, channelManager2)
defer clusterReload.Close()
@ -128,7 +128,7 @@ func TestClusterCreate(t *testing.T) {
t.Run("loadKv Fails", func(t *testing.T) {
kv := memkv.NewMemoryKV()
fkv := &loadPrefixFailKV{TxnKV: kv}
_, err := NewChannelManager(fkv, dummyPosProvider{})
_, err := NewChannelManager(fkv, newMockHandler())
assert.NotNil(t, err)
})
}
@ -147,7 +147,7 @@ func TestRegister(t *testing.T) {
t.Run("register to empty cluster", func(t *testing.T) {
kv := memkv.NewMemoryKV()
sessionManager := NewSessionManager()
channelManager, err := NewChannelManager(kv, dummyPosProvider{})
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.Nil(t, err)
cluster := NewCluster(sessionManager, channelManager)
defer cluster.Close()
@ -168,7 +168,7 @@ func TestRegister(t *testing.T) {
t.Run("register to empty cluster with buffer channels", func(t *testing.T) {
kv := memkv.NewMemoryKV()
sessionManager := NewSessionManager()
channelManager, err := NewChannelManager(kv, dummyPosProvider{})
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.Nil(t, err)
err = channelManager.Watch(&channel{
Name: "ch1",
@ -197,7 +197,7 @@ func TestRegister(t *testing.T) {
t.Run("register and restart with no channel", func(t *testing.T) {
kv := memkv.NewMemoryKV()
sessionManager := NewSessionManager()
channelManager, err := NewChannelManager(kv, dummyPosProvider{})
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.Nil(t, err)
cluster := NewCluster(sessionManager, channelManager)
addr := "localhost:8080"
@ -212,7 +212,7 @@ func TestRegister(t *testing.T) {
cluster.Close()
sessionManager2 := NewSessionManager()
channelManager2, err := NewChannelManager(kv, dummyPosProvider{})
channelManager2, err := NewChannelManager(kv, newMockHandler())
assert.Nil(t, err)
restartCluster := NewCluster(sessionManager2, channelManager2)
defer restartCluster.Close()
@ -225,7 +225,7 @@ func TestUnregister(t *testing.T) {
t.Run("remove node after unregister", func(t *testing.T) {
kv := memkv.NewMemoryKV()
sessionManager := NewSessionManager()
channelManager, err := NewChannelManager(kv, dummyPosProvider{})
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.Nil(t, err)
cluster := NewCluster(sessionManager, channelManager)
defer cluster.Close()
@ -246,7 +246,7 @@ func TestUnregister(t *testing.T) {
t.Run("move channels to online nodes after unregister", func(t *testing.T) {
kv := memkv.NewMemoryKV()
sessionManager := NewSessionManager()
channelManager, err := NewChannelManager(kv, dummyPosProvider{})
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.Nil(t, err)
cluster := NewCluster(sessionManager, channelManager)
defer cluster.Close()
@ -280,7 +280,7 @@ func TestUnregister(t *testing.T) {
}
kv := memkv.NewMemoryKV()
sessionManager := NewSessionManager(withSessionCreator(mockSessionCreator))
channelManager, err := NewChannelManager(kv, dummyPosProvider{})
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.Nil(t, err)
cluster := NewCluster(sessionManager, channelManager)
defer cluster.Close()
@ -311,7 +311,7 @@ func TestWatchIfNeeded(t *testing.T) {
}
kv := memkv.NewMemoryKV()
sessionManager := NewSessionManager(withSessionCreator(mockSessionCreator))
channelManager, err := NewChannelManager(kv, dummyPosProvider{})
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.Nil(t, err)
cluster := NewCluster(sessionManager, channelManager)
defer cluster.Close()
@ -334,7 +334,7 @@ func TestWatchIfNeeded(t *testing.T) {
t.Run("watch channel to empty cluster", func(t *testing.T) {
kv := memkv.NewMemoryKV()
sessionManager := NewSessionManager()
channelManager, err := NewChannelManager(kv, dummyPosProvider{})
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.Nil(t, err)
cluster := NewCluster(sessionManager, channelManager)
defer cluster.Close()
@ -355,7 +355,7 @@ func TestConsistentHashPolicy(t *testing.T) {
sessionManager := NewSessionManager()
chash := consistent.New()
factory := NewConsistentHashChannelPolicyFactory(chash)
channelManager, err := NewChannelManager(kv, dummyPosProvider{}, withFactory(factory))
channelManager, err := NewChannelManager(kv, newMockHandler(), withFactory(factory))
assert.Nil(t, err)
cluster := NewCluster(sessionManager, channelManager)
defer cluster.Close()
@ -430,7 +430,7 @@ func TestConsistentHashPolicy(t *testing.T) {
func TestCluster_Flush(t *testing.T) {
kv := memkv.NewMemoryKV()
sessionManager := NewSessionManager()
channelManager, err := NewChannelManager(kv, dummyPosProvider{})
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.Nil(t, err)
cluster := NewCluster(sessionManager, channelManager)
defer cluster.Close()

View File

@ -1,38 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord
import (
"github.com/milvus-io/milvus/internal/proto/datapb"
)
// positionProvider provides vchannel pair related position pairs
type positionProvider interface {
GetVChanPositions(channel string, collectionID UniqueID, paritionID UniqueID) *datapb.VchannelInfo
}
var _ positionProvider = (*dummyPosProvider)(nil)
type dummyPosProvider struct{}
//GetVChanPositions implements positionProvider
func (dp dummyPosProvider) GetVChanPositions(channel string, collectionID UniqueID, paritionID UniqueID) *datapb.VchannelInfo {
return &datapb.VchannelInfo{
CollectionID: collectionID,
ChannelName: channel,
}
}

View File

@ -0,0 +1,134 @@
package datacoord
import (
"context"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/rootcoord"
"go.uber.org/zap"
)
type Handler interface {
GetVChanPositions(channel string, collectionID UniqueID, partitionID UniqueID) *datapb.VchannelInfo
CheckShouldDropChannel(channel string) bool
}
// Handler is a helper of Server
type ServerHandler struct {
s *Server
}
func newServerHandler(s *Server) *ServerHandler {
return &ServerHandler{s: s}
}
// GetVChanPositions get vchannel latest postitions with provided dml channel names
func (h *ServerHandler) GetVChanPositions(channel string, collectionID UniqueID, partitionID UniqueID) *datapb.VchannelInfo {
segments := h.s.meta.GetSegmentsByChannel(channel)
log.Debug("GetSegmentsByChannel",
zap.Any("collectionID", collectionID),
zap.Any("channel", channel),
zap.Any("numOfSegments", len(segments)),
)
var flushed []*datapb.SegmentInfo
var unflushed []*datapb.SegmentInfo
var seekPosition *internalpb.MsgPosition
for _, s := range segments {
if (partitionID > allPartitionID && s.PartitionID != partitionID) ||
(s.GetStartPosition() == nil && s.GetDmlPosition() == nil) {
continue
}
if s.GetState() == commonpb.SegmentState_Flushing || s.GetState() == commonpb.SegmentState_Flushed {
flushed = append(flushed, trimSegmentInfo(s.SegmentInfo))
} else {
unflushed = append(unflushed, s.SegmentInfo)
}
var segmentPosition *internalpb.MsgPosition
if s.GetDmlPosition() != nil {
segmentPosition = s.GetDmlPosition()
} else {
segmentPosition = s.GetStartPosition()
}
if seekPosition == nil || segmentPosition.Timestamp < seekPosition.Timestamp {
seekPosition = segmentPosition
}
}
// use collection start position when segment position is not found
if seekPosition == nil {
collection := h.GetCollection(h.s.ctx, collectionID)
if collection != nil {
seekPosition = getCollectionStartPosition(channel, collection)
}
}
return &datapb.VchannelInfo{
CollectionID: collectionID,
ChannelName: channel,
SeekPosition: seekPosition,
FlushedSegments: flushed,
UnflushedSegments: unflushed,
}
}
func getCollectionStartPosition(channel string, collectionInfo *datapb.CollectionInfo) *internalpb.MsgPosition {
for _, sp := range collectionInfo.GetStartPositions() {
if sp.GetKey() != rootcoord.ToPhysicalChannel(channel) {
continue
}
return &internalpb.MsgPosition{
ChannelName: channel,
MsgID: sp.GetData(),
}
}
return nil
}
// trimSegmentInfo returns a shallow copy of datapb.SegmentInfo and sets ALL binlog info to nil
func trimSegmentInfo(info *datapb.SegmentInfo) *datapb.SegmentInfo {
return &datapb.SegmentInfo{
ID: info.ID,
CollectionID: info.CollectionID,
PartitionID: info.PartitionID,
InsertChannel: info.InsertChannel,
NumOfRows: info.NumOfRows,
State: info.State,
MaxRowNum: info.MaxRowNum,
LastExpireTime: info.LastExpireTime,
StartPosition: info.StartPosition,
DmlPosition: info.DmlPosition,
}
}
func (h *ServerHandler) GetCollection(ctx context.Context, collectionID UniqueID) *datapb.CollectionInfo {
coll := h.s.meta.GetCollection(collectionID)
if coll != nil {
return coll
}
err := h.s.loadCollectionFromRootCoord(ctx, collectionID)
if err != nil {
log.Warn("failed to load collection from rootcoord", zap.Int64("collectionID", collectionID), zap.Error(err))
}
return h.s.meta.GetCollection(collectionID)
}
func (h *ServerHandler) CheckShouldDropChannel(channel string) bool {
segments := h.s.meta.GetSegmentsByChannel(channel)
for _, segment := range segments {
if segment.GetStartPosition() != nil && // fitler empty segment
// FIXME: we filter compaction generated segments
// because datanode may not know the segment due to the network lag or
// datacoord crash when handling CompleteCompaction.
len(segment.CompactionFrom) == 0 &&
segment.GetState() != commonpb.SegmentState_Dropped {
return false
}
}
return true
}

View File

@ -584,3 +584,21 @@ func (t *mockCompactionTrigger) stop() {
}
panic("not implemented")
}
type mockHandler struct {
}
func newMockHandler() *mockHandler {
return &mockHandler{}
}
func (h *mockHandler) GetVChanPositions(channel string, collectionID UniqueID, partitionID UniqueID) *datapb.VchannelInfo {
return &datapb.VchannelInfo{
CollectionID: collectionID,
ChannelName: channel,
}
}
func (h *mockHandler) CheckShouldDropChannel(channel string) bool {
return false
}

View File

@ -28,7 +28,6 @@ import (
datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
"github.com/milvus-io/milvus/internal/logutil"
"github.com/milvus-io/milvus/internal/rootcoord"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/mqclient"
"github.com/milvus-io/milvus/internal/util/tsoutil"
@ -46,7 +45,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
)
@ -89,9 +87,6 @@ type rootCoordCreatorFunc func(ctx context.Context, metaRootPath string, etcdEnd
// makes sure Server implements `DataCoord`
var _ types.DataCoord = (*Server)(nil)
// makes sure Server implements `positionProvider`
var _ positionProvider = (*Server)(nil)
// Server implements `types.Datacoord`
// handles Data Cooridinator related jobs
type Server struct {
@ -112,6 +107,7 @@ type Server struct {
rootCoordClient types.RootCoord
garbageCollector *garbageCollector
gcOpt GcOption
handler Handler
compactionTrigger trigger
compactionHandler compactionPlanContext
@ -248,6 +244,8 @@ func (s *Server) Start() error {
return err
}
s.handler = newServerHandler(s)
if err = s.initCluster(); err != nil {
return err
}
@ -282,7 +280,7 @@ func (s *Server) initCluster() error {
}
var err error
s.channelManager, err = NewChannelManager(s.kvClient, s)
s.channelManager, err = NewChannelManager(s.kvClient, s.handler)
if err != nil {
return err
}
@ -783,96 +781,3 @@ func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID i
s.meta.AddCollection(collInfo)
return nil
}
// GetVChanPositions get vchannel latest postitions with provided dml channel names
func (s *Server) GetVChanPositions(channel string, collectionID UniqueID, partitionID UniqueID) *datapb.VchannelInfo {
segments := s.meta.GetSegmentsByChannel(channel)
log.Debug("GetSegmentsByChannel",
zap.Any("collectionID", collectionID),
zap.Any("channel", channel),
zap.Any("numOfSegments", len(segments)),
)
var flushed []*datapb.SegmentInfo
var unflushed []*datapb.SegmentInfo
var seekPosition *internalpb.MsgPosition
for _, s := range segments {
if (partitionID > allPartitionID && s.PartitionID != partitionID) ||
(s.GetStartPosition() == nil && s.GetDmlPosition() == nil) {
continue
}
if s.GetState() == commonpb.SegmentState_Flushing || s.GetState() == commonpb.SegmentState_Flushed {
flushed = append(flushed, trimSegmentInfo(s.SegmentInfo))
} else {
unflushed = append(unflushed, s.SegmentInfo)
}
var segmentPosition *internalpb.MsgPosition
if s.GetDmlPosition() != nil {
segmentPosition = s.GetDmlPosition()
} else {
segmentPosition = s.GetStartPosition()
}
if seekPosition == nil || segmentPosition.Timestamp < seekPosition.Timestamp {
seekPosition = segmentPosition
}
}
// use collection start position when segment position is not found
if seekPosition == nil {
collection := s.GetCollection(s.ctx, collectionID)
if collection != nil {
seekPosition = getCollectionStartPosition(channel, collection)
}
}
return &datapb.VchannelInfo{
CollectionID: collectionID,
ChannelName: channel,
SeekPosition: seekPosition,
FlushedSegments: flushed,
UnflushedSegments: unflushed,
}
}
func getCollectionStartPosition(channel string, collectionInfo *datapb.CollectionInfo) *internalpb.MsgPosition {
for _, sp := range collectionInfo.GetStartPositions() {
if sp.GetKey() != rootcoord.ToPhysicalChannel(channel) {
continue
}
return &internalpb.MsgPosition{
ChannelName: channel,
MsgID: sp.GetData(),
}
}
return nil
}
// trimSegmentInfo returns a shallow copy of datapb.SegmentInfo and sets ALL binlog info to nil
func trimSegmentInfo(info *datapb.SegmentInfo) *datapb.SegmentInfo {
return &datapb.SegmentInfo{
ID: info.ID,
CollectionID: info.CollectionID,
PartitionID: info.PartitionID,
InsertChannel: info.InsertChannel,
NumOfRows: info.NumOfRows,
State: info.State,
MaxRowNum: info.MaxRowNum,
LastExpireTime: info.LastExpireTime,
StartPosition: info.StartPosition,
DmlPosition: info.DmlPosition,
}
}
func (s *Server) GetCollection(ctx context.Context, collectionID UniqueID) *datapb.CollectionInfo {
coll := s.meta.GetCollection(collectionID)
if coll != nil {
return coll
}
err := s.loadCollectionFromRootCoord(ctx, collectionID)
if err != nil {
log.Warn("failed to load collection from RootCoord", zap.Int64("collectionID", collectionID), zap.Error(err))
}
return s.meta.GetCollection(collectionID)
}

View File

@ -1182,13 +1182,13 @@ func TestGetVChannelPos(t *testing.T) {
assert.Nil(t, err)
t.Run("get unexisted channel", func(t *testing.T) {
vchan := svr.GetVChanPositions("chx1", 0, allPartitionID)
vchan := svr.handler.GetVChanPositions("chx1", 0, allPartitionID)
assert.Empty(t, vchan.UnflushedSegments)
assert.Empty(t, vchan.FlushedSegments)
})
t.Run("get existed channel", func(t *testing.T) {
vchan := svr.GetVChanPositions("ch1", 0, allPartitionID)
vchan := svr.handler.GetVChanPositions("ch1", 0, allPartitionID)
assert.EqualValues(t, 1, len(vchan.FlushedSegments))
assert.EqualValues(t, 1, vchan.FlushedSegments[0].ID)
assert.EqualValues(t, 2, len(vchan.UnflushedSegments))
@ -1196,7 +1196,7 @@ func TestGetVChannelPos(t *testing.T) {
})
t.Run("empty collection", func(t *testing.T) {
infos := svr.GetVChanPositions("ch0_suffix", 1, allPartitionID)
infos := svr.handler.GetVChanPositions("ch0_suffix", 1, allPartitionID)
assert.EqualValues(t, 1, infos.CollectionID)
assert.EqualValues(t, 0, len(infos.FlushedSegments))
assert.EqualValues(t, 0, len(infos.UnflushedSegments))
@ -1204,7 +1204,7 @@ func TestGetVChannelPos(t *testing.T) {
})
t.Run("filter partition", func(t *testing.T) {
infos := svr.GetVChanPositions("ch1", 0, 1)
infos := svr.handler.GetVChanPositions("ch1", 0, 1)
assert.EqualValues(t, 0, infos.CollectionID)
assert.EqualValues(t, 0, len(infos.FlushedSegments))
assert.EqualValues(t, 1, len(infos.UnflushedSegments))
@ -1622,7 +1622,7 @@ func TestOptions(t *testing.T) {
t.Run("SetCluster", func(t *testing.T) {
kv := memkv.NewMemoryKV()
sessionManager := NewSessionManager()
channelManager, err := NewChannelManager(kv, dummyPosProvider{})
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.Nil(t, err)
cluster := NewCluster(sessionManager, channelManager)
@ -1670,7 +1670,7 @@ func (p *mockPolicyFactory) NewDeregisterPolicy() DeregisterPolicy {
func TestHandleSessionEvent(t *testing.T) {
kv := memkv.NewMemoryKV()
channelManager, err := NewChannelManager(kv, dummyPosProvider{}, withFactory(&mockPolicyFactory{}))
channelManager, err := NewChannelManager(kv, newMockHandler(), withFactory(&mockPolicyFactory{}))
assert.Nil(t, err)
sessionManager := NewSessionManager()
cluster := NewCluster(sessionManager, channelManager)

View File

@ -114,10 +114,6 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
zap.String("channelName", r.GetChannelName()),
zap.Uint32("count", r.GetCount()))
if coll := s.GetCollection(ctx, r.CollectionID); coll == nil {
continue
}
s.cluster.Watch(r.ChannelName, r.CollectionID)
allocations, err := s.segmentManager.AllocSegment(ctx,
@ -347,7 +343,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
log.Debug("flush segment with meta", zap.Int64("id", req.SegmentID),
zap.Any("meta", req.GetField2BinlogPaths()))
if req.GetDropped() && s.checkShouldDropChannel(channel) {
if req.GetDropped() && s.handler.CheckShouldDropChannel(channel) {
log.Debug("remove channel", zap.String("channel", channel))
err = s.channelManager.RemoveChannel(channel)
if err != nil {
@ -378,21 +374,6 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
return resp, nil
}
func (s *Server) checkShouldDropChannel(channel string) bool {
segments := s.meta.GetSegmentsByChannel(channel)
for _, segment := range segments {
if segment.GetStartPosition() != nil && // fitler empty segment
// FIXME: we filter compaction generated segments
// because datanode may not know the segment due to the network lag or
// datacoord crash when handling CompleteCompaction.
len(segment.CompactionFrom) == 0 &&
segment.GetState() != commonpb.SegmentState_Dropped {
return false
}
}
return true
}
// GetComponentStates returns DataCoord's current state
func (s *Server) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
resp := &internalpb.ComponentStates{
@ -521,7 +502,7 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf
channels := dresp.GetVirtualChannelNames()
channelInfos := make([]*datapb.VchannelInfo, 0, len(channels))
for _, c := range channels {
channelInfo := s.GetVChanPositions(c, collectionID, partitionID)
channelInfo := s.handler.GetVChanPositions(c, collectionID, partitionID)
channelInfos = append(channelInfos, channelInfo)
log.Debug("datacoord append channelInfo in GetRecoveryInfo",
zap.Any("collectionID", collectionID),