Set fixed insert channel number, remove channel manager

Signed-off-by: sunby <bingyi.sun@zilliz.com>
pull/4973/head^2
sunby 2021-02-03 15:07:48 +08:00 committed by yefu.chen
parent 01e9dc8e3f
commit 208700b0e7
7 changed files with 32 additions and 99 deletions

View File

@ -9,5 +9,5 @@ dataservice:
defaultSizePerRecord: 1024
# old name: segmentExpireDuration: 2000
IDAssignExpiration: 2000 # ms
insertChannelNumPerCollection: 4
insertChannelNum: 16
dataNodeNum: 1

View File

@ -1,38 +0,0 @@
package dataservice
import (
"strconv"
"sync"
)
type (
insertChannelManager struct {
mu sync.RWMutex
count int
channelGroups map[UniqueID][]string // collection id to channel ranges
}
)
func newInsertChannelManager() *insertChannelManager {
return &insertChannelManager{
count: 0,
channelGroups: make(map[UniqueID][]string),
}
}
func (cm *insertChannelManager) GetChannels(collectionID UniqueID) ([]string, error) {
cm.mu.Lock()
defer cm.mu.Unlock()
if _, ok := cm.channelGroups[collectionID]; ok {
return cm.channelGroups[collectionID], nil
}
channels := Params.InsertChannelNumPerCollection
cg := make([]string, channels)
var i int64 = 0
for ; i < channels; i++ {
cg[i] = Params.InsertChannelPrefixName + strconv.Itoa(cm.count)
cm.count++
}
cm.channelGroups[collectionID] = cg
return cg, nil
}

View File

@ -1,21 +0,0 @@
package dataservice
import (
"strconv"
"testing"
"github.com/stretchr/testify/assert"
)
func TestGetChannel(t *testing.T) {
Params.Init()
Params.InsertChannelNumPerCollection = 4
Params.InsertChannelPrefixName = "channel"
manager := newInsertChannelManager()
channels, err := manager.GetChannels(1)
assert.Nil(t, err)
assert.EqualValues(t, Params.InsertChannelNumPerCollection, len(channels))
for i := 0; i < len(channels); i++ {
assert.EqualValues(t, Params.InsertChannelPrefixName+strconv.Itoa(i), channels[i])
}
}

View File

@ -2,7 +2,6 @@ package dataservice
import (
"log"
"sort"
"sync"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
@ -23,18 +22,16 @@ type (
channelNum int
}
dataNodeCluster struct {
mu sync.RWMutex
finishCh chan struct{}
nodes []*dataNode
watchedCollection map[UniqueID]bool
mu sync.RWMutex
finishCh chan struct{}
nodes []*dataNode
}
)
func newDataNodeCluster(finishCh chan struct{}) *dataNodeCluster {
return &dataNodeCluster{
finishCh: finishCh,
nodes: make([]*dataNode, 0),
watchedCollection: make(map[UniqueID]bool),
finishCh: finishCh,
nodes: make([]*dataNode, 0),
}
}
@ -72,13 +69,9 @@ func (c *dataNodeCluster) GetNodeIDs() []int64 {
return ret
}
func (c *dataNodeCluster) WatchInsertChannels(collectionID UniqueID, channels []string) {
func (c *dataNodeCluster) WatchInsertChannels(channels []string) {
c.mu.Lock()
defer c.mu.Unlock()
if c.watchedCollection[collectionID] {
return
}
sort.Slice(c.nodes, func(i, j int) bool { return c.nodes[i].channelNum < c.nodes[j].channelNum })
var groups [][]string
if len(channels) < len(c.nodes) {
groups = make([][]string, len(channels))
@ -109,7 +102,6 @@ func (c *dataNodeCluster) WatchInsertChannels(collectionID UniqueID, channels []
}
c.nodes[i].channelNum += len(group)
}
c.watchedCollection[collectionID] = true
}
func (c *dataNodeCluster) GetDataNodeStates() ([]*internalpb2.ComponentInfo, error) {
@ -153,5 +145,4 @@ func (c *dataNodeCluster) Clear() {
defer c.mu.Unlock()
c.finishCh = make(chan struct{})
c.nodes = make([]*dataNode, 0)
c.watchedCollection = make(map[UniqueID]bool)
}

View File

@ -33,7 +33,7 @@ func TestWatchChannels(t *testing.T) {
channelNum: 0,
})
}
cluster.WatchInsertChannels(c.collectionID, c.channels)
cluster.WatchInsertChannels(c.channels)
for i := 0; i < len(cluster.nodes); i++ {
assert.EqualValues(t, c.channelNums[i], cluster.nodes[i].channelNum)
}

View File

@ -24,14 +24,14 @@ type ParamTable struct {
DefaultRecordSize int64
SegIDAssignExpiration int64
InsertChannelPrefixName string
InsertChannelNumPerCollection int64
StatisticsChannelName string
TimeTickChannelName string
DataNodeNum int
SegmentInfoChannelName string
DataServiceSubscriptionName string
K2SChannelNames []string
InsertChannelPrefixName string
InsertChannelNum int64
StatisticsChannelName string
TimeTickChannelName string
DataNodeNum int
SegmentInfoChannelName string
DataServiceSubscriptionName string
K2SChannelNames []string
SegmentFlushMetaPath string
}
@ -61,7 +61,7 @@ func (p *ParamTable) Init() {
p.initDefaultRecordSize()
p.initSegIDAssignExpiration()
p.initInsertChannelPrefixName()
p.initInsertChannelNumPerCollection()
p.initInsertChannelNum()
p.initStatisticsChannelName()
p.initTimeTickChannelName()
p.initDataNodeNum()
@ -150,8 +150,8 @@ func (p *ParamTable) initInsertChannelPrefixName() {
}
}
func (p *ParamTable) initInsertChannelNumPerCollection() {
p.InsertChannelNumPerCollection = p.ParseInt64("dataservice.insertChannelNumPerCollection")
func (p *ParamTable) initInsertChannelNum() {
p.InsertChannelNum = p.ParseInt64("dataservice.insertChannelNum")
}
func (p *ParamTable) initStatisticsChannelName() {

View File

@ -85,7 +85,6 @@ type (
segAllocator segmentAllocator
statsHandler *statsHandler
ddHandler *ddHandler
insertChannelMgr *insertChannelManager
allocator allocator
cluster *dataNodeCluster
msgProducer *timesync.MsgProducer
@ -95,6 +94,7 @@ type (
k2sMsgStream msgstream.MsgStream
ddChannelName string
segmentInfoStream msgstream.MsgStream
insertChannels []string
}
)
@ -103,14 +103,23 @@ func CreateServer(ctx context.Context) (*Server, error) {
ch := make(chan struct{})
s := &Server{
ctx: ctx,
insertChannelMgr: newInsertChannelManager(),
registerFinishCh: ch,
cluster: newDataNodeCluster(ch),
}
s.insertChannels = s.getInsertChannels()
s.state.Store(internalpb2.StateCode_INITIALIZING)
return s, nil
}
func (s *Server) getInsertChannels() []string {
channels := make([]string, Params.InsertChannelNum)
var i int64 = 0
for ; i < Params.InsertChannelNum; i++ {
channels[i] = Params.InsertChannelPrefixName + strconv.FormatInt(i, 10)
}
return channels
}
func (s *Server) SetMasterClient(masterClient MasterClient) {
s.masterClient = masterClient
}
@ -137,6 +146,7 @@ func (s *Server) Start() error {
}
s.startServerLoop()
s.waitDataNodeRegister()
s.cluster.WatchInsertChannels(s.insertChannels)
if err = s.initMsgProducer(); err != nil {
return err
}
@ -675,16 +685,7 @@ func (s *Server) GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*dat
}
func (s *Server) GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error) {
if !s.checkStateIsHealthy() {
return nil, errors.New("server is initializing")
}
channels, err := s.insertChannelMgr.GetChannels(req.CollectionID)
if err != nil {
return nil, err
}
s.cluster.WatchInsertChannels(req.CollectionID, channels)
return channels, nil
return s.insertChannels, nil
}
func (s *Server) GetCollectionStatistics(req *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error) {