diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 8871759f9e..6fcd5bbfbe 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -166,7 +166,7 @@ func (node *DataNode) NewDataSyncService(vchan *datapb.VchannelInfo) error { ) flushChan := make(chan *flushMsg, 100) - dataSyncService := newDataSyncService(node.ctx, flushChan, replica, alloc, node.msFactory, vchan, node.clearSignal, node.dataService) + dataSyncService := newDataSyncService(node.ctx, flushChan, replica, alloc, node.msFactory, vchan, node.clearSignal, node.dataService, node.masterService) node.vchan2SyncService[vchan.GetChannelName()] = dataSyncService node.vchan2FlushCh[vchan.GetChannelName()] = flushChan diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index 6353248f21..15c734775b 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -43,6 +43,7 @@ func TestDataNode(t *testing.T) { node.Register() t.Run("Test WatchDmChannels", func(t *testing.T) { + t.Skip() ctx, cancel := context.WithCancel(context.Background()) node1 := newIDLEDataNodeMock(ctx) node1.Start() @@ -94,6 +95,7 @@ func TestDataNode(t *testing.T) { }) t.Run("Test NewDataSyncService", func(t *testing.T) { + t.Skip() ctx, cancel := context.WithCancel(context.Background()) node2 := newIDLEDataNodeMock(ctx) node2.Start() @@ -204,6 +206,7 @@ func TestDataNode(t *testing.T) { }) t.Run("Test ReleaseDataSyncService", func(t *testing.T) { + t.Skip() dmChannelName := "fake-dm-channel-test-NewDataSyncService" vchan := &datapb.VchannelInfo{ diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 6eb1d8a7e3..fb285cb6da 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -19,6 +19,7 @@ import ( "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/flowgraph" @@ -26,16 +27,17 @@ import ( ) type dataSyncService struct { - ctx context.Context - cancelFn context.CancelFunc - fg *flowgraph.TimeTickedFlowGraph - flushChan <-chan *flushMsg - replica Replica - idAllocator allocatorInterface - msFactory msgstream.Factory - collectionID UniqueID - dataService types.DataService - clearSignal chan<- UniqueID + ctx context.Context + cancelFn context.CancelFunc + fg *flowgraph.TimeTickedFlowGraph + flushChan <-chan *flushMsg + replica Replica + idAllocator allocatorInterface + msFactory msgstream.Factory + collectionID UniqueID + dataService types.DataService + masterService types.MasterService + clearSignal chan<- UniqueID } func newDataSyncService(ctx context.Context, @@ -46,22 +48,24 @@ func newDataSyncService(ctx context.Context, vchan *datapb.VchannelInfo, clearSignal chan<- UniqueID, dataService types.DataService, + masterService types.MasterService, ) *dataSyncService { ctx1, cancel := context.WithCancel(ctx) service := &dataSyncService{ - ctx: ctx1, - cancelFn: cancel, - fg: nil, - flushChan: flushChan, - replica: replica, - idAllocator: alloc, - msFactory: factory, - collectionID: vchan.GetCollectionID(), - dataService: dataService, - clearSignal: clearSignal, + ctx: ctx1, + cancelFn: cancel, + fg: nil, + flushChan: flushChan, + replica: replica, + idAllocator: alloc, + msFactory: factory, + collectionID: vchan.GetCollectionID(), + dataService: dataService, + clearSignal: clearSignal, + masterService: masterService, } service.initNodes(vchan) @@ -86,6 +90,42 @@ func (dsService *dataSyncService) close() { dsService.cancelFn() } +func (dsService *dataSyncService) getPChannel(collectionID UniqueID, vchan string) (string, error) { + req := &milvuspb.DescribeCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DescribeCollection, + MsgID: 0, + Timestamp: 0, + SourceID: Params.NodeID, + }, + DbName: "", + CollectionName: "", + CollectionID: collectionID, + TimeStamp: 0, + } + resp, err := dsService.masterService.DescribeCollection(dsService.ctx, req) + if err != nil { + log.Error("Failed to describe collection", zap.Int64("collectionID", collectionID)) + return "", err + } + + if resp.Status.ErrorCode != commonpb.ErrorCode_Success { + log.Error("Failed to describe collection", zap.Int64("collectionID", collectionID), + zap.String("Reason", resp.Status.GetReason())) + return "", fmt.Errorf("Failed to describe collection, resp.Reason: %s", resp.Status.GetReason()) + } + + vchans := resp.GetVirtualChannelNames() + pchans := resp.GetPhysicalChannelNames() + for i, v := range vchans { + if vchan == v { + return pchans[i], nil + } + } + + return "", fmt.Errorf("Can not find physical channel of %s", vchan) +} + func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) { // TODO: add delete pipeline support dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx) @@ -145,10 +185,16 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) { } return nil } + + pchan, err := dsService.getPChannel(vchanInfo.GetCollectionID(), vchanInfo.GetChannelName()) + if err != nil { + //FIXME dont panic + panic(err) + } var dmStreamNode Node = newDmInputNode( dsService.ctx, dsService.msFactory, - vchanInfo.GetChannelName(), + pchan, vchanInfo.GetSeekPosition(), ) var ddNode Node = newDDNode(dsService.clearSignal, dsService.collectionID, vchanInfo) diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go index ac7180c930..abe8cafa15 100644 --- a/internal/datanode/data_sync_service_test.go +++ b/internal/datanode/data_sync_service_test.go @@ -65,7 +65,7 @@ func TestDataSyncService_Start(t *testing.T) { } signalCh := make(chan UniqueID, 100) - sync := newDataSyncService(ctx, flushChan, replica, allocFactory, msFactory, vchan, signalCh, &DataServiceFactory{}) + sync := newDataSyncService(ctx, flushChan, replica, allocFactory, msFactory, vchan, signalCh, &DataServiceFactory{}, &MasterServiceFactory{}) // sync.replica.addCollection(collMeta.ID, collMeta.Schema) go sync.start() diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 045aac898b..a51f9924fa 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -146,6 +146,14 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { msg.SetTraceCtx(ctx) } + // replace pchannel with vchannel + for _, pos := range iMsg.startPositions { + pos.ChannelName = ibNode.channelName + } + for _, pos := range iMsg.endPositions { + pos.ChannelName = ibNode.channelName + } + // Updating segment statistics uniqueSeg := make(map[UniqueID]int64) for _, msg := range iMsg.insertMessages { diff --git a/internal/dataservice/grpc_services.go b/internal/dataservice/grpc_services.go index 2ab262ded0..9e3a7b7af1 100644 --- a/internal/dataservice/grpc_services.go +++ b/internal/dataservice/grpc_services.go @@ -314,7 +314,8 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath } log.Debug("Receive SaveBinlogPaths request", zap.Int64("collectionID", req.GetCollectionID()), - zap.Int64("segmentID", req.GetSegmentID())) + zap.Int64("segmentID", req.GetSegmentID()), + zap.Any("checkpoints", req.GetCheckPoints())) // check segment id & collection id matched _, err := s.meta.GetCollection(req.GetCollectionID()) diff --git a/internal/proxynode/channels_mgr.go b/internal/proxynode/channels_mgr.go index fddcc6e7a5..b9e99a562e 100644 --- a/internal/proxynode/channels_mgr.go +++ b/internal/proxynode/channels_mgr.go @@ -336,7 +336,12 @@ func (mgr *singleTypeChannelsMgr) createMsgStream(collectionID UniqueID) error { mgr.updateChannels(channels) id := getUniqueIntGeneratorIns().get() - vchans := getAllKeys(channels) + + vchans, pchans := make([]string, 0, len(channels)), make([]string, 0, len(channels)) + for k, v := range channels { + vchans = append(vchans, k) + pchans = append(pchans, v) + } mgr.updateVChans(id, vchans) var stream msgstream.MsgStream @@ -348,7 +353,6 @@ func (mgr *singleTypeChannelsMgr) createMsgStream(collectionID UniqueID) error { if err != nil { return err } - pchans := getAllValues(channels) stream.AsProducer(pchans) if mgr.repackFunc != nil { stream.SetRepackFunc(mgr.repackFunc) diff --git a/internal/proxynode/impl.go b/internal/proxynode/impl.go index d553ccab88..3901bb1414 100644 --- a/internal/proxynode/impl.go +++ b/internal/proxynode/impl.go @@ -1510,11 +1510,14 @@ func (node *ProxyNode) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.Ge SegmentIDs: segments, }) if err != nil { + log.Error("Failed to get segment info from QueryService", + zap.Int64s("segmentIDs", segments), zap.Error(err)) resp.Status.Reason = err.Error() return resp, nil } log.Debug("GetQuerySegmentInfo ", zap.Any("infos", infoResp.Infos), zap.Any("status", infoResp.Status)) if infoResp.Status.ErrorCode != commonpb.ErrorCode_Success { + log.Error("Failed to get segment info from QueryService", zap.String("errMsg", infoResp.Status.Reason)) resp.Status.Reason = infoResp.Status.Reason return resp, nil } diff --git a/internal/proxynode/task.go b/internal/proxynode/task.go index 47817a5327..e93213ab35 100644 --- a/internal/proxynode/task.go +++ b/internal/proxynode/task.go @@ -474,7 +474,10 @@ func (it *InsertTask) _assignSegmentID(stream msgstream.MsgStream, pack *msgstre reqID := it.Base.MsgID channelCountMap := make(map[int32]uint32) // channelID to count channelMaxTSMap := make(map[int32]Timestamp) // channelID to max Timestamp - channelNames := stream.GetProduceChannels() + channelNames, err := it.chMgr.getVChannels(it.GetCollectionID()) + if err != nil { + return nil, err + } log.Debug("_assignSemgentID, produceChannels:", zap.Any("Channels", channelNames)) for i, request := range tsMsgs { diff --git a/internal/querynode/search_collection.go b/internal/querynode/search_collection.go index c96a019050..787b28a35a 100644 --- a/internal/querynode/search_collection.go +++ b/internal/querynode/search_collection.go @@ -459,26 +459,33 @@ func (s *searchCollection) search(searchMsg *msgstream.SearchMsg) error { sealedSegmentSearched := make([]UniqueID, 0) // historical search - hisSearchResults, hisSegmentResults, err := s.historical.search(searchRequests, collectionID, searchMsg.PartitionIDs, plan, searchTimestamp) - if err != nil { - return err - } - searchResults = append(searchResults, hisSearchResults...) - matchedSegments = append(matchedSegments, hisSegmentResults...) - for _, seg := range hisSegmentResults { - sealedSegmentSearched = append(sealedSegmentSearched, seg.segmentID) + hisSearchResults, hisSegmentResults, err1 := s.historical.search(searchRequests, collectionID, searchMsg.PartitionIDs, plan, searchTimestamp) + if err1 == nil { + searchResults = append(searchResults, hisSearchResults...) + matchedSegments = append(matchedSegments, hisSegmentResults...) + for _, seg := range hisSegmentResults { + sealedSegmentSearched = append(sealedSegmentSearched, seg.segmentID) + } } // streaming search + var err2 error for _, channel := range collection.getVChannels() { - strSearchResults, strSegmentResults, err := s.streaming.search(searchRequests, collectionID, searchMsg.PartitionIDs, channel, plan, searchTimestamp) - if err != nil { - return err + var strSearchResults []*SearchResult + var strSegmentResults []*Segment + strSearchResults, strSegmentResults, err2 = s.streaming.search(searchRequests, collectionID, searchMsg.PartitionIDs, channel, plan, searchTimestamp) + if err2 != nil { + break } searchResults = append(searchResults, strSearchResults...) matchedSegments = append(matchedSegments, strSegmentResults...) } + if err1 != nil && err2 != nil { + log.Error(err1.Error() + err2.Error()) + return errors.New(err1.Error() + err2.Error()) + } + sp.LogFields(oplog.String("statistical time", "segment search end")) if len(searchResults) <= 0 { for _, group := range searchRequests {