diff --git a/configs/advanced/channel.yaml b/configs/advanced/channel.yaml index ae52394aa2..2d125748b0 100644 --- a/configs/advanced/channel.yaml +++ b/configs/advanced/channel.yaml @@ -21,10 +21,10 @@ msgChannel: queryNodeStats: "query-node-stats" # cmd for loadIndex, flush, etc... cmd: "cmd" - dataServiceInsertChannel: "insert-channel-" - dataServiceStatistic: "dataservice-statistics-channel" - dataServiceTimeTick: "dataservice-timetick-channel" - dataServiceSegmentInfo: "segment-info-channel" + dataCoordInsertChannel: "insert-channel-" + dataCoordStatistic: "datacoord-statistics-channel" + dataCoordTimeTick: "datacoord-timetick-channel" + dataCoordSegmentInfo: "segment-info-channel" # sub name generation rule: ${subNamePrefix}-${NodeID} subNamePrefix: @@ -32,4 +32,5 @@ msgChannel: proxySubNamePrefix: "proxy" queryNodeSubNamePrefix: "queryNode" dataNodeSubNamePrefix: "dataNode" - dataServiceSubNamePrefix: "dataService" + dataCoordSubNamePrefix: "dataCoord" + diff --git a/configs/advanced/data_service.yaml b/configs/advanced/data_service.yaml index b3b602a20f..0562929147 100644 --- a/configs/advanced/data_service.yaml +++ b/configs/advanced/data_service.yaml @@ -9,7 +9,7 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express # or implied. See the License for the specific language governing permissions and limitations under the License. -dataservice: +datacoord: segment: size: 512 # MB sizeFactor: 0.75 diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 7f5ae838b7..f128ed48ff 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -16,10 +16,10 @@ etcd: rootPath: by-dev metaSubPath: meta # metaRootPath = rootPath + '/' + metaSubPath kvSubPath: kv # kvRootPath = rootPath + '/' + kvSubPath - segmentBinlogSubPath: dataservice/binlog/segment # Full Path = rootPath/metaSubPath/segmentBinlogSubPath - collectionBinlogSubPath: dataservice/binlog/collection # Full Path = rootPath/metaSubPath/collectionBinglogSubPath - flushStreamPosSubPath: dataservice/flushstream # Full path = rootPath/metaSubPath/flushStreamPosSubPath - statsStreamPosSubPath: dataservice/statsstream # Full path = rootPath/metaSubPath/statsStreamPosSubPath + segmentBinlogSubPath: datacoord/binlog/segment # Full Path = rootPath/metaSubPath/segmentBinlogSubPath + collectionBinlogSubPath: datacoord/binlog/collection # Full Path = rootPath/metaSubPath/collectionBinglogSubPath + flushStreamPosSubPath: datacoord/flushstream # Full path = rootPath/metaSubPath/flushStreamPosSubPath + statsStreamPosSubPath: datacoord/statsstream # Full path = rootPath/metaSubPath/statsStreamPosSubPath minio: address: localhost diff --git a/internal/datanode/param_table.go b/internal/datanode/param_table.go index 21101996c2..c264af8955 100644 --- a/internal/datanode/param_table.go +++ b/internal/datanode/param_table.go @@ -150,7 +150,7 @@ func (p *ParamTable) initPulsarAddress() { func (p *ParamTable) initSegmentStatisticsChannelName() { - path, err := p.Load("msgChannel.chanNamePrefix.dataServiceStatistic") + path, err := p.Load("msgChannel.chanNamePrefix.dataCoordStatistic") if err != nil { panic(err) } @@ -158,7 +158,7 @@ func (p *ParamTable) initSegmentStatisticsChannelName() { } func (p *ParamTable) initTimeTickChannelName() { - path, err := p.Load("msgChannel.chanNamePrefix.dataServiceTimeTick") + path, err := p.Load("msgChannel.chanNamePrefix.dataCoordTimeTick") if err != nil { panic(err) } diff --git a/internal/dataservice/param.go b/internal/dataservice/param.go index 06d6cc8170..238ebde08a 100644 --- a/internal/dataservice/param.go +++ b/internal/dataservice/param.go @@ -46,11 +46,11 @@ type ParamTable struct { SegmentSizeFactor float64 SegIDAssignExpiration int64 - InsertChannelPrefixName string - StatisticsChannelName string - TimeTickChannelName string - SegmentInfoChannelName string - DataServiceSubscriptionName string + InsertChannelPrefixName string + StatisticsChannelName string + TimeTickChannelName string + SegmentInfoChannelName string + DataCoordSubscriptionName string Log log.Config } @@ -83,7 +83,7 @@ func (p *ParamTable) Init() { p.initStatisticsChannelName() p.initTimeTickChannelName() p.initSegmentInfoChannelName() - p.initDataServiceSubscriptionName() + p.initDataCoordSubscriptionName() p.initLogCfg() p.initFlushStreamPosSubPath() @@ -148,20 +148,20 @@ func (p *ParamTable) initCollectionBinlogSubPath() { } func (p *ParamTable) initSegmentSize() { - p.SegmentSize = p.ParseFloat("dataservice.segment.size") + p.SegmentSize = p.ParseFloat("datacoord.segment.size") } func (p *ParamTable) initSegmentSizeFactor() { - p.SegmentSizeFactor = p.ParseFloat("dataservice.segment.sizeFactor") + p.SegmentSizeFactor = p.ParseFloat("datacoord.segment.sizeFactor") } func (p *ParamTable) initSegIDAssignExpiration() { - p.SegIDAssignExpiration = p.ParseInt64("dataservice.segment.IDAssignExpiration") //ms + p.SegIDAssignExpiration = p.ParseInt64("datacoord.segment.IDAssignExpiration") //ms } func (p *ParamTable) initInsertChannelPrefixName() { var err error - p.InsertChannelPrefixName, err = p.Load("msgChannel.chanNamePrefix.dataServiceInsertChannel") + p.InsertChannelPrefixName, err = p.Load("msgChannel.chanNamePrefix.dataCoordInsertChannel") if err != nil { panic(err) } @@ -169,7 +169,7 @@ func (p *ParamTable) initInsertChannelPrefixName() { func (p *ParamTable) initStatisticsChannelName() { var err error - p.StatisticsChannelName, err = p.Load("msgChannel.chanNamePrefix.dataServiceStatistic") + p.StatisticsChannelName, err = p.Load("msgChannel.chanNamePrefix.dataCoordStatistic") if err != nil { panic(err) } @@ -177,7 +177,7 @@ func (p *ParamTable) initStatisticsChannelName() { func (p *ParamTable) initTimeTickChannelName() { var err error - p.TimeTickChannelName, err = p.Load("msgChannel.chanNamePrefix.dataServiceTimeTick") + p.TimeTickChannelName, err = p.Load("msgChannel.chanNamePrefix.dataCoordTimeTick") if err != nil { panic(err) } @@ -185,15 +185,15 @@ func (p *ParamTable) initTimeTickChannelName() { func (p *ParamTable) initSegmentInfoChannelName() { var err error - p.SegmentInfoChannelName, err = p.Load("msgChannel.chanNamePrefix.dataServiceSegmentInfo") + p.SegmentInfoChannelName, err = p.Load("msgChannel.chanNamePrefix.dataCoordSegmentInfo") if err != nil { panic(err) } } -func (p *ParamTable) initDataServiceSubscriptionName() { +func (p *ParamTable) initDataCoordSubscriptionName() { var err error - p.DataServiceSubscriptionName, err = p.Load("msgChannel.subNamePrefix.dataServiceSubNamePrefix") + p.DataCoordSubscriptionName, err = p.Load("msgChannel.subNamePrefix.dataCoordSubNamePrefix") if err != nil { panic(err) } @@ -219,7 +219,7 @@ func (p *ParamTable) initLogCfg() { panic(err) } if len(rootPath) != 0 { - p.Log.File.Filename = path.Join(rootPath, "dataservice-"+strconv.FormatInt(p.NodeID, 10)+".log") + p.Log.File.Filename = path.Join(rootPath, "datacoord-"+strconv.FormatInt(p.NodeID, 10)+".log") } else { p.Log.File.Filename = "" } diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index 82cc7479f2..959990edc6 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -37,8 +37,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/milvuspb" ) -const role = "dataservice" - const masterClientTimout = 20 * time.Second type ( @@ -93,7 +91,7 @@ func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, erro // Register register data service at etcd func (s *Server) Register() error { s.session = sessionutil.NewSession(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints) - s.activeCh = s.session.Init(typeutil.DataServiceRole, Params.IP, true) + s.activeCh = s.session.Init(typeutil.DataCoordRole, Params.IP, true) Params.NodeID = s.session.ServerID return nil } @@ -160,7 +158,7 @@ func (s *Server) initCluster() error { func (s *Server) initServiceDiscovery() error { sessions, rev, err := s.session.GetSessions(typeutil.DataNodeRole) if err != nil { - log.Debug("DataService initMeta failed", zap.Error(err)) + log.Debug("DataCoord initMeta failed", zap.Error(err)) return err } log.Debug("registered sessions", zap.Any("sessions", sessions)) @@ -175,7 +173,7 @@ func (s *Server) initServiceDiscovery() error { } if err := s.cluster.startup(datanodes); err != nil { - log.Debug("DataService loadMetaFromMaster failed", zap.Error(err)) + log.Debug("DataCoord loadMetaFromMaster failed", zap.Error(err)) return err } @@ -195,7 +193,7 @@ func (s *Server) initSegmentInfoChannel() error { return err } s.segmentInfoStream.AsProducer([]string{Params.SegmentInfoChannelName}) - log.Debug("DataService AsProducer: " + Params.SegmentInfoChannelName) + log.Debug("DataCoord AsProducer: " + Params.SegmentInfoChannelName) s.segmentInfoStream.Start() return nil } @@ -224,7 +222,7 @@ func (s *Server) initFlushMsgStream() error { return err } s.flushMsgStream.AsProducer([]string{Params.SegmentInfoChannelName}) - log.Debug("dataservice AsProducer:" + Params.SegmentInfoChannelName) + log.Debug("DataCoord AsProducer:" + Params.SegmentInfoChannelName) s.flushMsgStream.Start() return nil @@ -244,10 +242,10 @@ func (s *Server) startStatsChannel(ctx context.Context) { defer logutil.LogPanic() defer s.serverLoopWg.Done() statsStream, _ := s.msFactory.NewMsgStream(ctx) - statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataServiceSubscriptionName) - log.Debug("dataservce stats stream", + statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataCoordSubscriptionName) + log.Debug("DataCoord stats stream", zap.String("channelName", Params.StatisticsChannelName), - zap.String("descriptionName", Params.DataServiceSubscriptionName)) + zap.String("descriptionName", Params.DataCoordSubscriptionName)) statsStream.Start() defer statsStream.Close() for { @@ -284,9 +282,9 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) { return } ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName}, - Params.DataServiceSubscriptionName) - log.Debug(fmt.Sprintf("dataservice AsConsumer:%s:%s", - Params.TimeTickChannelName, Params.DataServiceSubscriptionName)) + Params.DataCoordSubscriptionName) + log.Debug(fmt.Sprintf("DataCoord AsConsumer:%s:%s", + Params.TimeTickChannelName, Params.DataCoordSubscriptionName)) ttMsgStream.Start() defer ttMsgStream.Close() for { @@ -454,7 +452,7 @@ func (s *Server) Stop() error { if !atomic.CompareAndSwapInt64(&s.isServing, 2, 0) { return nil } - log.Debug("dataservice server shutdown") + log.Debug("DataCoord server shutdown") atomic.StoreInt64(&s.isServing, 0) s.cluster.releaseSessions() s.segmentInfoStream.Close() diff --git a/internal/distributed/dataservice/client/client.go b/internal/distributed/dataservice/client/client.go index 237827b224..9e94e20867 100644 --- a/internal/distributed/dataservice/client/client.go +++ b/internal/distributed/dataservice/client/client.go @@ -45,7 +45,7 @@ type Client struct { } func getDataServiceAddress(sess *sessionutil.Session) (string, error) { - key := typeutil.DataServiceRole + key := typeutil.DataCoordRole msess, _, err := sess.GetSessions(key) if err != nil { log.Debug("DataServiceClient, getSessions failed", zap.Any("key", key), zap.Error(err)) diff --git a/internal/rootcoord/param_table.go b/internal/rootcoord/param_table.go index 7007849fba..ab810530c4 100644 --- a/internal/rootcoord/param_table.go +++ b/internal/rootcoord/param_table.go @@ -148,7 +148,7 @@ func (p *ParamTable) initStatisticsChannelName() { } func (p *ParamTable) initSegmentInfoChannelName() { - channel, err := p.Load("msgChannel.chanNamePrefix.dataServiceSegmentInfo") + channel, err := p.Load("msgChannel.chanNamePrefix.dataCoordSegmentInfo") if err != nil { panic(err) } diff --git a/internal/util/typeutil/type.go b/internal/util/typeutil/type.go index 52bae5971c..f5ec1d26d0 100644 --- a/internal/util/typeutil/type.go +++ b/internal/util/typeutil/type.go @@ -22,6 +22,6 @@ const ( QueryNodeRole = "QueryNode" IndexServiceRole = "IndexService" IndexNodeRole = "IndexNode" - DataServiceRole = "DataService" + DataCoordRole = "DataCoord" DataNodeRole = "DataNode" )