mirror of https://github.com/milvus-io/milvus.git
Some minor fixes and improvements (#16814)
/kind improvement Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>pull/16827/head
parent
64d2793fe9
commit
31ddff2056
|
@ -582,14 +582,16 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
|
|||
continue
|
||||
}
|
||||
|
||||
// Cache the segment and send it to its flush channel.
|
||||
flushedSeg = append(flushedSeg, segID)
|
||||
// Double check that the segment is still not cached.
|
||||
// Skip this flush if segment ID is cached, otherwise cache the segment ID and proceed.
|
||||
exist := node.segmentCache.checkOrCache(segID)
|
||||
if exist {
|
||||
logDupFlush(req.GetCollectionID(), segID)
|
||||
continue
|
||||
}
|
||||
// flushedSeg is only for logging purpose.
|
||||
flushedSeg = append(flushedSeg, segID)
|
||||
// Send the segment to its flush channel.
|
||||
flushCh <- flushMsg{
|
||||
msgID: req.GetBase().GetMsgID(),
|
||||
timestamp: req.GetBase().GetTimestamp(),
|
||||
|
|
|
@ -451,7 +451,10 @@ func (c *Core) getSegments(ctx context.Context, collID typeutil.UniqueID) (map[t
|
|||
segID2PartID[s] = partID
|
||||
}
|
||||
} else {
|
||||
log.Debug("failed to get flushed segments from data coord", zap.Int64("collection_id", collID), zap.Int64("partition_id", partID), zap.Error(err))
|
||||
log.Error("failed to get flushed segments from dataCoord",
|
||||
zap.Int64("collection ID", collID),
|
||||
zap.Int64("partition ID", partID),
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
@ -2514,16 +2517,16 @@ func (c *Core) checkSegmentLoadedLoop(ctx context.Context, taskID int64, colID i
|
|||
log.Info("(in check segment loaded loop) context done, exiting checkSegmentLoadedLoop")
|
||||
return
|
||||
case <-ticker.C:
|
||||
log.Info("(in check segment loaded loop) check segments' loading states",
|
||||
zap.Int64("task ID", taskID))
|
||||
resp, err := c.CallGetSegmentInfoService(ctx, colID, segIDs)
|
||||
if err != nil {
|
||||
log.Warn("failed to call get segment info on queryCoord",
|
||||
log.Warn("(in check segment loaded loop) failed to call get segment info on queryCoord",
|
||||
zap.Int64("task ID", taskID),
|
||||
zap.Int64("collection ID", colID),
|
||||
zap.Int64s("segment IDs", segIDs))
|
||||
} else if len(resp.GetInfos()) == len(segIDs) {
|
||||
// Check if all segment info are loaded in queryNodes.
|
||||
log.Info("all import data segments loaded in queryNodes",
|
||||
log.Info("(in check segment loaded loop) all import data segments loaded in queryNodes",
|
||||
zap.Int64("task ID", taskID),
|
||||
zap.Int64("collection ID", colID),
|
||||
zap.Int64s("segment IDs", segIDs))
|
||||
c.importManager.updateTaskStateCode(taskID, commonpb.ImportState_DataQueryable)
|
||||
|
@ -2552,10 +2555,9 @@ func (c *Core) checkCompleteIndexLoop(ctx context.Context, taskID int64, colID i
|
|||
log.Info("(in check complete index loop) context done, exiting checkCompleteIndexLoop")
|
||||
return
|
||||
case <-ticker.C:
|
||||
log.Info("(in check complete index loop) check segments' index states",
|
||||
zap.Int64("task ID", taskID))
|
||||
if ct, err := c.CountCompleteIndex(ctx, colName, colID, segIDs); err == nil && ct == len(segIDs) {
|
||||
log.Info("all segment indices are ready!")
|
||||
log.Info("(in check complete index loop) all segment indices are ready!",
|
||||
zap.Int64("task ID", taskID))
|
||||
c.importManager.updateTaskStateCode(taskID, commonpb.ImportState_DataIndexed)
|
||||
return
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue