mirror of https://github.com/milvus-io/milvus.git
fix: [cp]Zero flushReq metric for all sealed segs (#32531)
See also: #32399 pr: #32404 Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/31531/head^2
parent
b62fba47b2
commit
cd830f8cbc
|
@ -74,8 +74,6 @@ func (node *DataNode) GetComponentStates(ctx context.Context, req *milvuspb.GetC
|
|||
|
||||
func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsRequest) (*commonpb.Status, error) {
|
||||
serverID := node.GetNodeID()
|
||||
metrics.DataNodeFlushReqCounter.WithLabelValues(fmt.Sprint(serverID), metrics.TotalLabel).Inc()
|
||||
|
||||
log := log.Ctx(ctx).With(
|
||||
zap.Int64("nodeID", serverID),
|
||||
zap.Int64("collectionID", req.GetCollectionID()),
|
||||
|
@ -83,7 +81,6 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
|
|||
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
|
||||
)
|
||||
log.Info("receive FlushSegments request")
|
||||
|
||||
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
|
||||
log.Warn("failed to FlushSegments", zap.Error(err))
|
||||
return merr.Status(err), nil
|
||||
|
@ -101,8 +98,6 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
|
|||
}
|
||||
|
||||
log.Info("success to FlushSegments")
|
||||
|
||||
metrics.DataNodeFlushReqCounter.WithLabelValues(fmt.Sprint(serverID), metrics.SuccessLabel).Inc()
|
||||
return merr.Success(), nil
|
||||
}
|
||||
|
||||
|
@ -371,12 +366,13 @@ func (node *DataNode) CheckChannelOperationProgress(ctx context.Context, req *da
|
|||
}
|
||||
|
||||
func (node *DataNode) FlushChannels(ctx context.Context, req *datapb.FlushChannelsRequest) (*commonpb.Status, error) {
|
||||
metrics.DataNodeFlushReqCounter.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.TotalLabel).Inc()
|
||||
log := log.Ctx(ctx).With(zap.Int64("nodeId", node.GetNodeID()),
|
||||
zap.Time("flushTs", tsoutil.PhysicalTime(req.GetFlushTs())),
|
||||
zap.Uint64("flushTs", req.GetFlushTs()),
|
||||
zap.Time("flushTs in Time", tsoutil.PhysicalTime(req.GetFlushTs())),
|
||||
zap.Strings("channels", req.GetChannels()))
|
||||
|
||||
log.Info("DataNode receives FlushChannels request")
|
||||
|
||||
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
|
||||
log.Warn("DataNode.FlushChannels failed", zap.Error(err))
|
||||
return merr.Status(err), nil
|
||||
|
@ -385,11 +381,13 @@ func (node *DataNode) FlushChannels(ctx context.Context, req *datapb.FlushChanne
|
|||
for _, channel := range req.GetChannels() {
|
||||
err := node.writeBufferManager.FlushChannel(ctx, channel, req.GetFlushTs())
|
||||
if err != nil {
|
||||
log.Warn("failed to flush channel", zap.String("channel", channel), zap.Error(err))
|
||||
log.Warn("WriteBufferManager failed to flush channel", zap.String("channel", channel), zap.Error(err))
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
}
|
||||
|
||||
metrics.DataNodeFlushReqCounter.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SuccessLabel).Inc()
|
||||
log.Info("success to FlushChannels")
|
||||
return merr.Success(), nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue