mirror of https://github.com/milvus-io/milvus.git
Rename dataservice to datacoordinator (#5904)
Signed-off-by: sunby <bingyi.sun@zilliz.com>pull/5913/head
parent
e51bba5143
commit
da1f9f9241
|
@ -21,10 +21,10 @@ msgChannel:
|
|||
queryNodeStats: "query-node-stats"
|
||||
# cmd for loadIndex, flush, etc...
|
||||
cmd: "cmd"
|
||||
dataServiceInsertChannel: "insert-channel-"
|
||||
dataServiceStatistic: "dataservice-statistics-channel"
|
||||
dataServiceTimeTick: "dataservice-timetick-channel"
|
||||
dataServiceSegmentInfo: "segment-info-channel"
|
||||
dataCoordInsertChannel: "insert-channel-"
|
||||
dataCoordStatistic: "datacoord-statistics-channel"
|
||||
dataCoordTimeTick: "datacoord-timetick-channel"
|
||||
dataCoordSegmentInfo: "segment-info-channel"
|
||||
|
||||
# sub name generation rule: ${subNamePrefix}-${NodeID}
|
||||
subNamePrefix:
|
||||
|
@ -32,4 +32,5 @@ msgChannel:
|
|||
proxySubNamePrefix: "proxy"
|
||||
queryNodeSubNamePrefix: "queryNode"
|
||||
dataNodeSubNamePrefix: "dataNode"
|
||||
dataServiceSubNamePrefix: "dataService"
|
||||
dataCoordSubNamePrefix: "dataCoord"
|
||||
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
# or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
dataservice:
|
||||
datacoord:
|
||||
segment:
|
||||
size: 512 # MB
|
||||
sizeFactor: 0.75
|
||||
|
|
|
@ -16,10 +16,10 @@ etcd:
|
|||
rootPath: by-dev
|
||||
metaSubPath: meta # metaRootPath = rootPath + '/' + metaSubPath
|
||||
kvSubPath: kv # kvRootPath = rootPath + '/' + kvSubPath
|
||||
segmentBinlogSubPath: dataservice/binlog/segment # Full Path = rootPath/metaSubPath/segmentBinlogSubPath
|
||||
collectionBinlogSubPath: dataservice/binlog/collection # Full Path = rootPath/metaSubPath/collectionBinglogSubPath
|
||||
flushStreamPosSubPath: dataservice/flushstream # Full path = rootPath/metaSubPath/flushStreamPosSubPath
|
||||
statsStreamPosSubPath: dataservice/statsstream # Full path = rootPath/metaSubPath/statsStreamPosSubPath
|
||||
segmentBinlogSubPath: datacoord/binlog/segment # Full Path = rootPath/metaSubPath/segmentBinlogSubPath
|
||||
collectionBinlogSubPath: datacoord/binlog/collection # Full Path = rootPath/metaSubPath/collectionBinglogSubPath
|
||||
flushStreamPosSubPath: datacoord/flushstream # Full path = rootPath/metaSubPath/flushStreamPosSubPath
|
||||
statsStreamPosSubPath: datacoord/statsstream # Full path = rootPath/metaSubPath/statsStreamPosSubPath
|
||||
|
||||
minio:
|
||||
address: localhost
|
||||
|
|
|
@ -150,7 +150,7 @@ func (p *ParamTable) initPulsarAddress() {
|
|||
|
||||
func (p *ParamTable) initSegmentStatisticsChannelName() {
|
||||
|
||||
path, err := p.Load("msgChannel.chanNamePrefix.dataServiceStatistic")
|
||||
path, err := p.Load("msgChannel.chanNamePrefix.dataCoordStatistic")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -158,7 +158,7 @@ func (p *ParamTable) initSegmentStatisticsChannelName() {
|
|||
}
|
||||
|
||||
func (p *ParamTable) initTimeTickChannelName() {
|
||||
path, err := p.Load("msgChannel.chanNamePrefix.dataServiceTimeTick")
|
||||
path, err := p.Load("msgChannel.chanNamePrefix.dataCoordTimeTick")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
|
|
@ -50,7 +50,7 @@ type ParamTable struct {
|
|||
StatisticsChannelName string
|
||||
TimeTickChannelName string
|
||||
SegmentInfoChannelName string
|
||||
DataServiceSubscriptionName string
|
||||
DataCoordSubscriptionName string
|
||||
|
||||
Log log.Config
|
||||
}
|
||||
|
@ -83,7 +83,7 @@ func (p *ParamTable) Init() {
|
|||
p.initStatisticsChannelName()
|
||||
p.initTimeTickChannelName()
|
||||
p.initSegmentInfoChannelName()
|
||||
p.initDataServiceSubscriptionName()
|
||||
p.initDataCoordSubscriptionName()
|
||||
p.initLogCfg()
|
||||
|
||||
p.initFlushStreamPosSubPath()
|
||||
|
@ -148,20 +148,20 @@ func (p *ParamTable) initCollectionBinlogSubPath() {
|
|||
}
|
||||
|
||||
func (p *ParamTable) initSegmentSize() {
|
||||
p.SegmentSize = p.ParseFloat("dataservice.segment.size")
|
||||
p.SegmentSize = p.ParseFloat("datacoord.segment.size")
|
||||
}
|
||||
|
||||
func (p *ParamTable) initSegmentSizeFactor() {
|
||||
p.SegmentSizeFactor = p.ParseFloat("dataservice.segment.sizeFactor")
|
||||
p.SegmentSizeFactor = p.ParseFloat("datacoord.segment.sizeFactor")
|
||||
}
|
||||
|
||||
func (p *ParamTable) initSegIDAssignExpiration() {
|
||||
p.SegIDAssignExpiration = p.ParseInt64("dataservice.segment.IDAssignExpiration") //ms
|
||||
p.SegIDAssignExpiration = p.ParseInt64("datacoord.segment.IDAssignExpiration") //ms
|
||||
}
|
||||
|
||||
func (p *ParamTable) initInsertChannelPrefixName() {
|
||||
var err error
|
||||
p.InsertChannelPrefixName, err = p.Load("msgChannel.chanNamePrefix.dataServiceInsertChannel")
|
||||
p.InsertChannelPrefixName, err = p.Load("msgChannel.chanNamePrefix.dataCoordInsertChannel")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -169,7 +169,7 @@ func (p *ParamTable) initInsertChannelPrefixName() {
|
|||
|
||||
func (p *ParamTable) initStatisticsChannelName() {
|
||||
var err error
|
||||
p.StatisticsChannelName, err = p.Load("msgChannel.chanNamePrefix.dataServiceStatistic")
|
||||
p.StatisticsChannelName, err = p.Load("msgChannel.chanNamePrefix.dataCoordStatistic")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -177,7 +177,7 @@ func (p *ParamTable) initStatisticsChannelName() {
|
|||
|
||||
func (p *ParamTable) initTimeTickChannelName() {
|
||||
var err error
|
||||
p.TimeTickChannelName, err = p.Load("msgChannel.chanNamePrefix.dataServiceTimeTick")
|
||||
p.TimeTickChannelName, err = p.Load("msgChannel.chanNamePrefix.dataCoordTimeTick")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -185,15 +185,15 @@ func (p *ParamTable) initTimeTickChannelName() {
|
|||
|
||||
func (p *ParamTable) initSegmentInfoChannelName() {
|
||||
var err error
|
||||
p.SegmentInfoChannelName, err = p.Load("msgChannel.chanNamePrefix.dataServiceSegmentInfo")
|
||||
p.SegmentInfoChannelName, err = p.Load("msgChannel.chanNamePrefix.dataCoordSegmentInfo")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ParamTable) initDataServiceSubscriptionName() {
|
||||
func (p *ParamTable) initDataCoordSubscriptionName() {
|
||||
var err error
|
||||
p.DataServiceSubscriptionName, err = p.Load("msgChannel.subNamePrefix.dataServiceSubNamePrefix")
|
||||
p.DataCoordSubscriptionName, err = p.Load("msgChannel.subNamePrefix.dataCoordSubNamePrefix")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -219,7 +219,7 @@ func (p *ParamTable) initLogCfg() {
|
|||
panic(err)
|
||||
}
|
||||
if len(rootPath) != 0 {
|
||||
p.Log.File.Filename = path.Join(rootPath, "dataservice-"+strconv.FormatInt(p.NodeID, 10)+".log")
|
||||
p.Log.File.Filename = path.Join(rootPath, "datacoord-"+strconv.FormatInt(p.NodeID, 10)+".log")
|
||||
} else {
|
||||
p.Log.File.Filename = ""
|
||||
}
|
||||
|
|
|
@ -37,8 +37,6 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
||||
)
|
||||
|
||||
const role = "dataservice"
|
||||
|
||||
const masterClientTimout = 20 * time.Second
|
||||
|
||||
type (
|
||||
|
@ -93,7 +91,7 @@ func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, erro
|
|||
// Register register data service at etcd
|
||||
func (s *Server) Register() error {
|
||||
s.session = sessionutil.NewSession(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints)
|
||||
s.activeCh = s.session.Init(typeutil.DataServiceRole, Params.IP, true)
|
||||
s.activeCh = s.session.Init(typeutil.DataCoordRole, Params.IP, true)
|
||||
Params.NodeID = s.session.ServerID
|
||||
return nil
|
||||
}
|
||||
|
@ -160,7 +158,7 @@ func (s *Server) initCluster() error {
|
|||
func (s *Server) initServiceDiscovery() error {
|
||||
sessions, rev, err := s.session.GetSessions(typeutil.DataNodeRole)
|
||||
if err != nil {
|
||||
log.Debug("DataService initMeta failed", zap.Error(err))
|
||||
log.Debug("DataCoord initMeta failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("registered sessions", zap.Any("sessions", sessions))
|
||||
|
@ -175,7 +173,7 @@ func (s *Server) initServiceDiscovery() error {
|
|||
}
|
||||
|
||||
if err := s.cluster.startup(datanodes); err != nil {
|
||||
log.Debug("DataService loadMetaFromMaster failed", zap.Error(err))
|
||||
log.Debug("DataCoord loadMetaFromMaster failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -195,7 +193,7 @@ func (s *Server) initSegmentInfoChannel() error {
|
|||
return err
|
||||
}
|
||||
s.segmentInfoStream.AsProducer([]string{Params.SegmentInfoChannelName})
|
||||
log.Debug("DataService AsProducer: " + Params.SegmentInfoChannelName)
|
||||
log.Debug("DataCoord AsProducer: " + Params.SegmentInfoChannelName)
|
||||
s.segmentInfoStream.Start()
|
||||
return nil
|
||||
}
|
||||
|
@ -224,7 +222,7 @@ func (s *Server) initFlushMsgStream() error {
|
|||
return err
|
||||
}
|
||||
s.flushMsgStream.AsProducer([]string{Params.SegmentInfoChannelName})
|
||||
log.Debug("dataservice AsProducer:" + Params.SegmentInfoChannelName)
|
||||
log.Debug("DataCoord AsProducer:" + Params.SegmentInfoChannelName)
|
||||
s.flushMsgStream.Start()
|
||||
|
||||
return nil
|
||||
|
@ -244,10 +242,10 @@ func (s *Server) startStatsChannel(ctx context.Context) {
|
|||
defer logutil.LogPanic()
|
||||
defer s.serverLoopWg.Done()
|
||||
statsStream, _ := s.msFactory.NewMsgStream(ctx)
|
||||
statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataServiceSubscriptionName)
|
||||
log.Debug("dataservce stats stream",
|
||||
statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataCoordSubscriptionName)
|
||||
log.Debug("DataCoord stats stream",
|
||||
zap.String("channelName", Params.StatisticsChannelName),
|
||||
zap.String("descriptionName", Params.DataServiceSubscriptionName))
|
||||
zap.String("descriptionName", Params.DataCoordSubscriptionName))
|
||||
statsStream.Start()
|
||||
defer statsStream.Close()
|
||||
for {
|
||||
|
@ -284,9 +282,9 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) {
|
|||
return
|
||||
}
|
||||
ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName},
|
||||
Params.DataServiceSubscriptionName)
|
||||
log.Debug(fmt.Sprintf("dataservice AsConsumer:%s:%s",
|
||||
Params.TimeTickChannelName, Params.DataServiceSubscriptionName))
|
||||
Params.DataCoordSubscriptionName)
|
||||
log.Debug(fmt.Sprintf("DataCoord AsConsumer:%s:%s",
|
||||
Params.TimeTickChannelName, Params.DataCoordSubscriptionName))
|
||||
ttMsgStream.Start()
|
||||
defer ttMsgStream.Close()
|
||||
for {
|
||||
|
@ -454,7 +452,7 @@ func (s *Server) Stop() error {
|
|||
if !atomic.CompareAndSwapInt64(&s.isServing, 2, 0) {
|
||||
return nil
|
||||
}
|
||||
log.Debug("dataservice server shutdown")
|
||||
log.Debug("DataCoord server shutdown")
|
||||
atomic.StoreInt64(&s.isServing, 0)
|
||||
s.cluster.releaseSessions()
|
||||
s.segmentInfoStream.Close()
|
||||
|
|
|
@ -45,7 +45,7 @@ type Client struct {
|
|||
}
|
||||
|
||||
func getDataServiceAddress(sess *sessionutil.Session) (string, error) {
|
||||
key := typeutil.DataServiceRole
|
||||
key := typeutil.DataCoordRole
|
||||
msess, _, err := sess.GetSessions(key)
|
||||
if err != nil {
|
||||
log.Debug("DataServiceClient, getSessions failed", zap.Any("key", key), zap.Error(err))
|
||||
|
|
|
@ -148,7 +148,7 @@ func (p *ParamTable) initStatisticsChannelName() {
|
|||
}
|
||||
|
||||
func (p *ParamTable) initSegmentInfoChannelName() {
|
||||
channel, err := p.Load("msgChannel.chanNamePrefix.dataServiceSegmentInfo")
|
||||
channel, err := p.Load("msgChannel.chanNamePrefix.dataCoordSegmentInfo")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
|
|
@ -22,6 +22,6 @@ const (
|
|||
QueryNodeRole = "QueryNode"
|
||||
IndexServiceRole = "IndexService"
|
||||
IndexNodeRole = "IndexNode"
|
||||
DataServiceRole = "DataService"
|
||||
DataCoordRole = "DataCoord"
|
||||
DataNodeRole = "DataNode"
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue