diff --git a/cmd/singlenode/main.go b/cmd/singlenode/main.go index c204f8d223..a6f2e76c9e 100644 --- a/cmd/singlenode/main.go +++ b/cmd/singlenode/main.go @@ -4,6 +4,8 @@ import ( "os" "github.com/zilliztech/milvus-distributed/cmd/distributed/roles" + "github.com/zilliztech/milvus-distributed/internal/log" + "github.com/zilliztech/milvus-distributed/internal/logutil" ) func initRoles(roles *roles.MilvusRoles) { @@ -19,9 +21,24 @@ func initRoles(roles *roles.MilvusRoles) { roles.EnableMsgStreamService = true } +func initLogCfg() log.Config { + logCfg := log.Config{} + logCfg.Format = "text" + logCfg.Level = "debug" + logCfg.Development = true + logCfg.File.MaxSize = 300 + logCfg.File.MaxBackups = 20 + logCfg.File.MaxDays = 10 + logCfg.File.Filename = "/tmp/milvus/singlenode.log" + return logCfg +} + func main() { var roles roles.MilvusRoles initRoles(&roles) os.Setenv("QUERY_NODE_ID", "1") + + logCfg := initLogCfg() + logutil.SetupLogger(&logCfg) roles.Run(true) } diff --git a/go.sum b/go.sum index 56765485b7..92604d6535 100644 --- a/go.sum +++ b/go.sum @@ -288,6 +288,7 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/protocolbuffers/protobuf v3.15.3+incompatible h1:5WExaSYHEGvU73sVHvqe+3/APOOyCVg/pDCeAlfpCrw= github.com/protocolbuffers/protobuf v3.15.4+incompatible h1:Blv4dGFGqHXX+r5Tqoc1ziXPMDElqZ+/ryYcE4bddN4= +github.com/protocolbuffers/protobuf v3.15.5+incompatible h1:NsnktN0DZ4i7hXZ6HPFH395SptFlMVhSc8XuhkiOwzI= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rs/xid v1.2.1 h1:mhH9Nq+C1fY2l1XIpgxIiUOfNpRBYH1kKcr+qfKgjRc= diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 686151ba35..763801e4dd 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -655,18 +655,21 @@ func newInsertBufferNode(ctx context.Context, flushMeta *metaTable, //input stream, data node time tick wTt, _ := factory.NewMsgStream(ctx) wTt.AsProducer([]string{Params.TimeTickChannelName}) + log.Debug("datanode AsProducer: " + Params.TimeTickChannelName) var wTtMsgStream msgstream.MsgStream = wTt wTtMsgStream.Start() // update statistics channel segS, _ := factory.NewMsgStream(ctx) segS.AsProducer([]string{Params.SegmentStatisticsChannelName}) + log.Debug("datanode AsProducer: " + Params.SegmentStatisticsChannelName) var segStatisticsMsgStream msgstream.MsgStream = segS segStatisticsMsgStream.Start() // segment flush completed channel cf, _ := factory.NewMsgStream(ctx) cf.AsProducer([]string{Params.CompleteFlushChannelName}) + log.Debug("datanode AsProducer: " + Params.CompleteFlushChannelName) var completeFlushStream msgstream.MsgStream = cf completeFlushStream.Start() diff --git a/internal/datanode/flow_graph_msg_stream_input_node.go b/internal/datanode/flow_graph_msg_stream_input_node.go index 96b5c2093c..b7bfd63f94 100644 --- a/internal/datanode/flow_graph_msg_stream_input_node.go +++ b/internal/datanode/flow_graph_msg_stream_input_node.go @@ -2,7 +2,9 @@ package datanode import ( "context" + "strings" + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" ) @@ -16,6 +18,7 @@ func newDmInputNode(ctx context.Context, factory msgstream.Factory) *flowgraph.I insertStream, _ := factory.NewTtMsgStream(ctx) insertStream.AsConsumer(consumeChannels, consumeSubName) + log.Debug("datanode AsConsumer: " + strings.Join(consumeChannels, ", ") + " : " + consumeSubName) var stream msgstream.MsgStream = insertStream node := flowgraph.NewInputNode(&stream, "dmInputNode", maxQueueLength, maxParallelism) @@ -30,6 +33,7 @@ func newDDInputNode(ctx context.Context, factory msgstream.Factory) *flowgraph.I tmpStream, _ := factory.NewTtMsgStream(ctx) tmpStream.AsConsumer(Params.DDChannelNames, consumeSubName) + log.Debug("datanode AsConsumer: " + strings.Join(Params.DDChannelNames, ", ") + " : " + consumeSubName) var stream msgstream.MsgStream = tmpStream node := flowgraph.NewInputNode(&stream, "ddInputNode", maxQueueLength, maxParallelism) diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index c4d389ef08..7864e37b34 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -6,6 +6,7 @@ import ( "fmt" "path" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -190,6 +191,7 @@ func (s *Server) initMeta() error { func (s *Server) initSegmentInfoChannel() { segmentInfoStream, _ := s.msFactory.NewMsgStream(s.ctx) segmentInfoStream.AsProducer([]string{Params.SegmentInfoChannelName}) + log.Debug("dataservice AsProducer: " + Params.SegmentInfoChannelName) s.segmentInfoStream = segmentInfoStream s.segmentInfoStream.Start() } @@ -199,6 +201,7 @@ func (s *Server) initMsgProducer() error { return err } s.ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName}, Params.DataServiceSubscriptionName) + log.Debug("dataservice AsConsumer: " + Params.TimeTickChannelName + " : " + Params.DataServiceSubscriptionName) s.ttMsgStream.Start() s.ttBarrier = timesync.NewHardTimeTickBarrier(s.ctx, s.ttMsgStream, s.cluster.GetNodeIDs()) s.ttBarrier.Start() @@ -206,6 +209,7 @@ func (s *Server) initMsgProducer() error { return err } s.k2sMsgStream.AsProducer(Params.K2SChannelNames) + log.Debug("dataservice AsProducer: " + strings.Join(Params.K2SChannelNames, ", ")) s.k2sMsgStream.Start() dataNodeTTWatcher := newDataNodeTimeTickWatcher(s.meta, s.segAllocator, s.cluster) k2sMsgWatcher := timesync.NewMsgTimeTickWatcher(s.k2sMsgStream) @@ -324,6 +328,7 @@ func (s *Server) startStatsChannel(ctx context.Context) { defer s.serverLoopWg.Done() statsStream, _ := s.msFactory.NewMsgStream(ctx) statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataServiceSubscriptionName) + log.Debug("dataservice AsConsumer: " + Params.StatisticsChannelName + " : " + Params.DataServiceSubscriptionName) statsStream.Start() defer statsStream.Close() for { @@ -353,6 +358,7 @@ func (s *Server) startSegmentFlushChannel(ctx context.Context) { defer s.serverLoopWg.Done() flushStream, _ := s.msFactory.NewMsgStream(ctx) flushStream.AsConsumer([]string{Params.SegmentInfoChannelName}, Params.DataServiceSubscriptionName) + log.Debug("dataservice AsConsumer: " + Params.SegmentInfoChannelName + " : " + Params.DataServiceSubscriptionName) flushStream.Start() defer flushStream.Close() for { @@ -388,6 +394,7 @@ func (s *Server) startDDChannel(ctx context.Context) { defer s.serverLoopWg.Done() ddStream, _ := s.msFactory.NewMsgStream(ctx) ddStream.AsConsumer([]string{s.ddChannelName}, Params.DataServiceSubscriptionName) + log.Debug("dataservice AsConsumer: " + s.ddChannelName + " : " + Params.DataServiceSubscriptionName) ddStream.Start() defer ddStream.Close() for { diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index d8fe513271..cbd2ed6e6f 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -454,6 +454,7 @@ func (c *Core) setMsgStreams() error { proxyTimeTickStream, _ := c.msFactory.NewMsgStream(c.ctx) proxyTimeTickStream.AsConsumer([]string{Params.ProxyTimeTickChannel}, Params.MsgChannelSubName) + log.Debug("master AsConsumer: " + Params.ProxyTimeTickChannel + " : " + Params.MsgChannelSubName) proxyTimeTickStream.Start() // master time tick channel @@ -462,6 +463,7 @@ func (c *Core) setMsgStreams() error { } timeTickStream, _ := c.msFactory.NewMsgStream(c.ctx) timeTickStream.AsProducer([]string{Params.TimeTickChannel}) + log.Debug("masterservice AsProducer: " + Params.TimeTickChannel) // master dd channel if Params.DdChannel == "" { @@ -469,6 +471,7 @@ func (c *Core) setMsgStreams() error { } ddStream, _ := c.msFactory.NewMsgStream(c.ctx) ddStream.AsProducer([]string{Params.DdChannel}) + log.Debug("masterservice AsProducer: " + Params.DdChannel) c.SendTimeTick = func(t typeutil.Timestamp) error { msgPack := ms.MsgPack{} @@ -602,6 +605,7 @@ func (c *Core) setMsgStreams() error { } dataServiceStream, _ := c.msFactory.NewMsgStream(c.ctx) dataServiceStream.AsConsumer([]string{Params.DataServiceSegmentChannel}, Params.MsgChannelSubName) + log.Debug("master AsConsumer: " + Params.DataServiceSegmentChannel + " : " + Params.MsgChannelSubName) dataServiceStream.Start() c.DataServiceSegmentChan = make(chan *datapb.SegmentInfo, 1024) c.DataNodeSegmentFlushCompletedChan = make(chan typeutil.UniqueID, 1024) diff --git a/internal/proxynode/insert_channels.go b/internal/proxynode/insert_channels.go index 64f2ae3c36..73a276bfda 100644 --- a/internal/proxynode/insert_channels.go +++ b/internal/proxynode/insert_channels.go @@ -102,6 +102,8 @@ func (m *InsertChannelsMap) createInsertMsgStream(collID UniqueID, channels []st stream, _ := m.msFactory.NewMsgStream(context.Background()) stream.AsProducer(channels) + // FIXME(wxyu): use log.Debug instead + log.Println("proxynode AsProducer: ", channels) repack := func(tsMsgs []msgstream.TsMsg, hashKeys [][]int32) (map[int32]*msgstream.MsgPack, error) { return insertRepackFunc(tsMsgs, hashKeys, m.nodeInstance.segAssigner, true) } diff --git a/internal/proxynode/proxy_node.go b/internal/proxynode/proxy_node.go index b3f1c054c8..0e8b929c90 100644 --- a/internal/proxynode/proxy_node.go +++ b/internal/proxynode/proxy_node.go @@ -184,6 +184,8 @@ func (node *ProxyNode) Init() error { node.queryMsgStream, _ = node.msFactory.NewMsgStream(node.ctx) node.queryMsgStream.AsProducer(Params.SearchChannelNames) + // FIXME(wxyu): use log.Debug instead + log.Println("proxynode AsProducer: ", Params.SearchChannelNames) log.Println("create query message stream ...") masterAddr := Params.MasterAddress @@ -211,6 +213,8 @@ func (node *ProxyNode) Init() error { node.manipulationMsgStream, _ = node.msFactory.NewMsgStream(node.ctx) node.manipulationMsgStream.AsProducer(Params.InsertChannelNames) + // FIXME(wxyu): use log.Debug instead + log.Println("proxynode AsProducer: ", Params.InsertChannelNames) repackFunc := func(tsMsgs []msgstream.TsMsg, hashKeys [][]int32) (map[int32]*msgstream.MsgPack, error) { return insertRepackFunc(tsMsgs, hashKeys, node.segAssigner, true) } diff --git a/internal/proxynode/task_scheduler.go b/internal/proxynode/task_scheduler.go index d29e272949..91132fd05b 100644 --- a/internal/proxynode/task_scheduler.go +++ b/internal/proxynode/task_scheduler.go @@ -390,8 +390,8 @@ func (sched *TaskScheduler) queryResultLoop() { defer sched.wg.Done() queryResultMsgStream, _ := sched.msFactory.NewMsgStream(sched.ctx) - queryResultMsgStream.AsConsumer(Params.SearchResultChannelNames, - Params.ProxySubName) + queryResultMsgStream.AsConsumer(Params.SearchResultChannelNames, Params.ProxySubName) + log.Println("proxynode AsConsumer: ", Params.SearchResultChannelNames, " : ", Params.ProxySubName) queryNodeNum := Params.QueryNodeNum queryResultMsgStream.Start() diff --git a/internal/proxynode/timetick.go b/internal/proxynode/timetick.go index 08f8d5c64d..6e89e69cd8 100644 --- a/internal/proxynode/timetick.go +++ b/internal/proxynode/timetick.go @@ -55,6 +55,8 @@ func newTimeTick(ctx context.Context, t.tickMsgStream, _ = t.msFactory.NewMsgStream(t.ctx) t.tickMsgStream.AsProducer(Params.ProxyTimeTickChannelNames) + // FIXME(wxyu): use log.Debug instead + log.Println("proxynode AsProducer: ", Params.ProxyTimeTickChannelNames) return t } diff --git a/internal/proxyservice/impl.go b/internal/proxyservice/impl.go index d7a4fb26b5..e1fa985694 100644 --- a/internal/proxyservice/impl.go +++ b/internal/proxyservice/impl.go @@ -110,6 +110,8 @@ func (s *ProxyService) Init() error { serviceTimeTickMsgStream, _ := s.msFactory.NewTtMsgStream(s.ctx) serviceTimeTickMsgStream.AsProducer([]string{Params.ServiceTimeTickChannel}) + // FIXME(wxyu): use log.Debug instead + log.Println("proxyservice AsProducer: ", []string{Params.ServiceTimeTickChannel}) log.Println("create service time tick producer channel: ", []string{Params.ServiceTimeTickChannel}) channels := make([]string, Params.InsertChannelNum) @@ -119,11 +121,14 @@ func (s *ProxyService) Init() error { } insertTickMsgStream, _ := s.msFactory.NewMsgStream(s.ctx) insertTickMsgStream.AsProducer(channels) + // FIXME(wxyu): use log.Debug instead + log.Println("proxyservice AsProducer: ", channels) log.Println("create insert time tick producer channel: ", channels) nodeTimeTickMsgStream, _ := s.msFactory.NewMsgStream(s.ctx) nodeTimeTickMsgStream.AsConsumer(Params.NodeTimeTickChannel, "proxyservicesub") // TODO: add config + log.Println("proxynode AsConsumer: ", Params.NodeTimeTickChannel, " : ", "proxyservicesub") log.Println("create node time tick consumer channel: ", Params.NodeTimeTickChannel) ttBarrier := newSoftTimeTickBarrier(s.ctx, nodeTimeTickMsgStream, []UniqueID{1}, 10) diff --git a/internal/querynode/flow_graph_service_time_node.go b/internal/querynode/flow_graph_service_time_node.go index b6e12942df..cd59e677cc 100644 --- a/internal/querynode/flow_graph_service_time_node.go +++ b/internal/querynode/flow_graph_service_time_node.go @@ -81,6 +81,7 @@ func newServiceTimeNode(ctx context.Context, replica ReplicaInterface, factory m timeTimeMsgStream, _ := factory.NewMsgStream(ctx) timeTimeMsgStream.AsProducer([]string{Params.QueryTimeTickChannelName}) + log.Debug("querynode AsProducer: " + Params.QueryTimeTickChannelName) return &serviceTimeNode{ baseNode: baseNode, diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 52326d4429..48f5dba9dc 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -15,6 +15,7 @@ import "C" import ( "context" "fmt" + "strings" "sync/atomic" "errors" @@ -301,10 +302,12 @@ func (node *QueryNode) AddQueryChannel(in *queryPb.AddQueryChannelsRequest) (*co consumeChannels := []string{in.RequestChannelID} consumeSubName := Params.MsgChannelSubName node.searchService.searchMsgStream.AsConsumer(consumeChannels, consumeSubName) + log.Debug("querynode AsConsumer: " + strings.Join(consumeChannels, ", ") + " : " + consumeSubName) // add result channel producerChannels := []string{in.ResultChannelID} node.searchService.searchResultMsgStream.AsProducer(producerChannels) + log.Debug("querynode AsProducer: " + strings.Join(producerChannels, ", ")) status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, @@ -391,6 +394,7 @@ func (node *QueryNode) WatchDmChannels(in *queryPb.WatchDmChannelsRequest) (*com consumeChannels := in.ChannelIDs consumeSubName := Params.MsgChannelSubName node.dataSyncService.dmStream.AsConsumer(consumeChannels, consumeSubName) + log.Debug("querynode AsConsumer: " + strings.Join(consumeChannels, ", ") + " : " + consumeSubName) status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, diff --git a/internal/querynode/search_service.go b/internal/querynode/search_service.go index 018d351559..18f78b163e 100644 --- a/internal/querynode/search_service.go +++ b/internal/querynode/search_service.go @@ -6,6 +6,7 @@ import ( "errors" "regexp" "strconv" + "strings" "sync" "go.uber.org/zap" @@ -48,8 +49,10 @@ func newSearchService(ctx context.Context, replica ReplicaInterface, factory msg consumeChannels := Params.SearchChannelNames consumeSubName := Params.MsgChannelSubName searchStream.AsConsumer(consumeChannels, consumeSubName) + log.Debug("querynode AsConsumer: " + strings.Join(consumeChannels, ", ") + " : " + consumeSubName) producerChannels := Params.SearchResultChannelNames searchResultStream.AsProducer(producerChannels) + log.Debug("querynode AsProducer: " + strings.Join(producerChannels, ", ")) searchServiceCtx, searchServiceCancel := context.WithCancel(ctx) msgBuffer := make(chan msgstream.TsMsg, receiveBufSize) diff --git a/internal/querynode/stats_service.go b/internal/querynode/stats_service.go index 7d08d8528b..b4eacadbc5 100644 --- a/internal/querynode/stats_service.go +++ b/internal/querynode/stats_service.go @@ -2,6 +2,7 @@ package querynode import ( "context" + "strings" "time" "github.com/zilliztech/milvus-distributed/internal/log" @@ -42,6 +43,7 @@ func (sService *statsService) start() { statsStream, _ := sService.msFactory.NewMsgStream(sService.ctx) statsStream.AsProducer(producerChannels) + log.Debug("querynode AsProducer: " + strings.Join(producerChannels, ", ")) var statsMsgStream msgstream.MsgStream = statsStream