diff --git a/internal/flushcommon/broker/datacoord.go b/internal/flushcommon/broker/datacoord.go index 98bbc99660..453e3eb4a0 100644 --- a/internal/flushcommon/broker/datacoord.go +++ b/internal/flushcommon/broker/datacoord.go @@ -65,12 +65,10 @@ func (dc *dataCoordBroker) ReportTimeTick(ctx context.Context, msgs []*msgpb.Dat } func (dc *dataCoordBroker) GetSegmentInfo(ctx context.Context, ids []int64) ([]*datapb.SegmentInfo, error) { - ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond)) - defer cancel() - - log := log.Ctx(ctx).With(zap.Int64s("segments", ids)) - getSegmentInfo := func(ids []int64) (*datapb.GetSegmentInfoResponse, error) { + ctx, cancel := context.WithTimeout(ctx, paramtable.Get().DataCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond)) + defer cancel() + infoResp, err := dc.client.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_SegmentInfo), @@ -80,12 +78,13 @@ func (dc *dataCoordBroker) GetSegmentInfo(ctx context.Context, ids []int64) ([]* IncludeUnHealthy: true, }) if err := merr.CheckRPCCall(infoResp, err); err != nil { - log.Warn("Fail to get SegmentInfo by ids from datacoord", zap.Error(err)) + log.Warn("Fail to get SegmentInfo by ids from datacoord", zap.Int64s("segments", ids), zap.Error(err)) return nil, err } + err = binlog.DecompressMultiBinLogs(infoResp.GetInfos()) if err != nil { - log.Warn("Fail to DecompressMultiBinLogs", zap.Error(err)) + log.Warn("Fail to DecompressMultiBinLogs", zap.Int64s("segments", ids), zap.Error(err)) return nil, err } return infoResp, nil @@ -99,6 +98,7 @@ func (dc *dataCoordBroker) GetSegmentInfo(ctx context.Context, ids []int64) ([]* resp, err := getSegmentInfo(ids[startIdx:endIdx]) if err != nil { + log.Warn("Fail to get SegmentInfo", zap.Int("total segment num", len(ids)), zap.Int("returned num", startIdx)) return nil, err } ret = append(ret, resp.GetInfos()...)