diff --git a/internal/proxynode/task.go b/internal/proxynode/task.go index 7b85eb0307..3d11536de0 100644 --- a/internal/proxynode/task.go +++ b/internal/proxynode/task.go @@ -397,17 +397,14 @@ func (dct *DropCollectionTask) Execute() error { if err != nil { return err } - - dct.result, err = dct.masterClient.DropCollection(dct.DropCollectionRequest) - if err != nil { - return err + dct.result, _ = dct.masterClient.DropCollection(dct.DropCollectionRequest) + if dct.result.ErrorCode != commonpb.ErrorCode_SUCCESS { + return errors.New(dct.result.Reason) } - err = globalInsertChannelsMap.closeInsertMsgStream(collID) if err != nil { return err } - return nil } diff --git a/internal/querynode/collection_replica.go b/internal/querynode/collection_replica.go index bf9c3e02a9..d97b67d74d 100644 --- a/internal/querynode/collection_replica.go +++ b/internal/querynode/collection_replica.go @@ -61,6 +61,7 @@ type collectionReplica interface { getSegmentStatistics() []*internalpb2.SegmentStats getEnabledSealedSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID) + getSealedSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID) replaceGrowingSegmentBySealedSegment(segment *Segment) error getTSafe() tSafe @@ -477,7 +478,7 @@ func (colReplica *collectionReplicaImpl) getEnabledSealedSegmentsBySegmentType(s } if segment.getType() == segType { targetCollectionIDs = append(targetCollectionIDs, segment.collectionID) - targetPartitionIDs = append(targetPartitionIDs, segment.collectionID) + targetPartitionIDs = append(targetPartitionIDs, segment.partitionID) targetSegmentIDs = append(targetSegmentIDs, segment.segmentID) } } @@ -486,6 +487,25 @@ func (colReplica *collectionReplicaImpl) getEnabledSealedSegmentsBySegmentType(s return targetCollectionIDs, targetPartitionIDs, targetSegmentIDs } +func (colReplica *collectionReplicaImpl) getSealedSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID) { + colReplica.mu.RLock() + defer colReplica.mu.RUnlock() + + targetCollectionIDs := make([]UniqueID, 0) + targetPartitionIDs := make([]UniqueID, 0) + targetSegmentIDs := make([]UniqueID, 0) + + for _, segment := range colReplica.segments { + if segment.getType() == segType { + targetCollectionIDs = append(targetCollectionIDs, segment.collectionID) + targetPartitionIDs = append(targetPartitionIDs, segment.partitionID) + targetSegmentIDs = append(targetSegmentIDs, segment.segmentID) + } + } + + return targetCollectionIDs, targetPartitionIDs, targetSegmentIDs +} + func (colReplica *collectionReplicaImpl) replaceGrowingSegmentBySealedSegment(segment *Segment) error { colReplica.mu.Lock() defer colReplica.mu.Unlock() diff --git a/internal/querynode/index_loader.go b/internal/querynode/index_loader.go index 207386ad5c..08d867ec7f 100644 --- a/internal/querynode/index_loader.go +++ b/internal/querynode/index_loader.go @@ -41,8 +41,9 @@ type loadIndex struct { } func (loader *indexLoader) doLoadIndex(wg *sync.WaitGroup) { - collectionIDs, _, segmentIDs := loader.replica.getEnabledSealedSegmentsBySegmentType(segTypeSealed) + collectionIDs, _, segmentIDs := loader.replica.getSealedSegmentsBySegmentType(segTypeSealed) if len(collectionIDs) <= 0 { + wg.Done() return } fmt.Println("do load index for sealed segments:", segmentIDs) diff --git a/internal/querynode/load_service.go b/internal/querynode/load_service.go index 1f3268399a..6540011370 100644 --- a/internal/querynode/load_service.go +++ b/internal/querynode/load_service.go @@ -40,8 +40,9 @@ func (s *loadService) close() { } func (s *loadService) loadSegmentActively(wg *sync.WaitGroup) { - collectionIDs, partitionIDs, segmentIDs := s.segLoader.replica.getEnabledSealedSegmentsBySegmentType(segTypeGrowing) + collectionIDs, partitionIDs, segmentIDs := s.segLoader.replica.getSealedSegmentsBySegmentType(segTypeGrowing) if len(collectionIDs) <= 0 { + wg.Done() return } fmt.Println("do load segment for growing segments:", segmentIDs) @@ -92,7 +93,7 @@ func (s *loadService) loadSegmentInternal(collectionID UniqueID, partitionID Uni _, buildID, errIndex := s.segLoader.indexLoader.getIndexInfo(collectionID, segmentID) if errIndex == nil { // we don't need load to vector fields - vectorFields, err := s.segLoader.replica.getVecFieldIDsByCollectionID(segmentID) + vectorFields, err := s.segLoader.replica.getVecFieldIDsByCollectionID(collectionID) if err != nil { return err } diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index af2a9bed87..2d79e8a69d 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -146,14 +146,14 @@ func (node *QueryNode) Start() error { // init services and manager node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica) node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica) - node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) + //node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) node.loadService = newLoadService(node.queryNodeLoopCtx, node.masterClient, node.dataClient, node.indexClient, node.replica, node.dataSyncService.dmStream) node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadService.segLoader.indexLoader.fieldStatsChan) // start services go node.dataSyncService.start() go node.searchService.start() - go node.metaService.start() + //go node.metaService.start() go node.loadService.start() go node.statsService.start()