mirror of https://github.com/milvus-io/milvus.git
fix datanode dropcollection error (#5690)
Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/5779/head
parent
351d87055a
commit
46699c4c2b
|
@ -184,6 +184,7 @@ func (node *DataNode) BackGroundGC(collIDCh <-chan UniqueID) {
|
|||
node.ReleaseDataSyncService(vchanName)
|
||||
}
|
||||
case <-node.ctx.Done():
|
||||
log.Info("DataNode ctx done")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
|
@ -61,6 +61,7 @@ func newDataSyncService(ctx context.Context,
|
|||
msFactory: factory,
|
||||
collectionID: vchan.GetCollectionID(),
|
||||
dataService: dataService,
|
||||
clearSignal: clearSignal,
|
||||
}
|
||||
|
||||
service.initNodes(vchan)
|
||||
|
@ -71,7 +72,6 @@ func (dsService *dataSyncService) start() {
|
|||
if dsService.fg != nil {
|
||||
log.Debug("Data Sync Service starting flowgraph")
|
||||
dsService.fg.Start()
|
||||
log.Debug("Data Sync Service starting flowgraph Done")
|
||||
} else {
|
||||
log.Debug("Data Sync Service flowgraph nil")
|
||||
}
|
||||
|
|
|
@ -76,8 +76,8 @@ func (ddn *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
switch msg.Type() {
|
||||
case commonpb.MsgType_DropCollection:
|
||||
if msg.(*msgstream.DropCollectionMsg).GetCollectionID() == ddn.collectionID {
|
||||
log.Info("Destroying current flowgraph", zap.Any("collectionID", ddn.collectionID))
|
||||
ddn.clearSignal <- ddn.collectionID
|
||||
log.Info("Destroying current flowgraph")
|
||||
}
|
||||
case commonpb.MsgType_Insert:
|
||||
log.Debug("DDNode with insert messages")
|
||||
|
|
|
@ -760,6 +760,12 @@ func (ibNode *insertBufferNode) updateSegStatistics(segIDs []UniqueID) error {
|
|||
log.Error("get segment statistics updates wrong", zap.Int64("segmentID", segID), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
log.Debug("Segment Statistics to Update",
|
||||
zap.Int64("Segment ID", updates.GetSegmentID()),
|
||||
zap.Int64("NumOfRows", updates.GetNumRows()),
|
||||
)
|
||||
|
||||
statsUpdates = append(statsUpdates, updates)
|
||||
}
|
||||
|
||||
|
|
|
@ -269,6 +269,7 @@ func (s *Server) startStatsChannel(ctx context.Context) {
|
|||
zap.Stringer("msgType", msg.Type()))
|
||||
continue
|
||||
}
|
||||
log.Debug("Receive DataNode segment statistics update")
|
||||
ssMsg := msg.(*msgstream.SegmentStatisticsMsg)
|
||||
for _, stat := range ssMsg.SegStats {
|
||||
s.segmentManager.UpdateSegmentStats(stat)
|
||||
|
|
Loading…
Reference in New Issue