mirror of https://github.com/milvus-io/milvus.git
fix: datacoord broker use querycoord broker config and get segment info timeout (#36026)
relate: https://github.com/milvus-io/milvus/issues/36025 Will case channel with large segment num watch failed. Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>pull/35961/head^2
parent
11325d9ed5
commit
9871841fa0
|
|
@ -65,12 +65,10 @@ func (dc *dataCoordBroker) ReportTimeTick(ctx context.Context, msgs []*msgpb.Dat
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dc *dataCoordBroker) GetSegmentInfo(ctx context.Context, ids []int64) ([]*datapb.SegmentInfo, error) {
|
func (dc *dataCoordBroker) GetSegmentInfo(ctx context.Context, ids []int64) ([]*datapb.SegmentInfo, error) {
|
||||||
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
log := log.Ctx(ctx).With(zap.Int64s("segments", ids))
|
|
||||||
|
|
||||||
getSegmentInfo := func(ids []int64) (*datapb.GetSegmentInfoResponse, error) {
|
getSegmentInfo := func(ids []int64) (*datapb.GetSegmentInfoResponse, error) {
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().DataCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
infoResp, err := dc.client.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
|
infoResp, err := dc.client.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
|
||||||
Base: commonpbutil.NewMsgBase(
|
Base: commonpbutil.NewMsgBase(
|
||||||
commonpbutil.WithMsgType(commonpb.MsgType_SegmentInfo),
|
commonpbutil.WithMsgType(commonpb.MsgType_SegmentInfo),
|
||||||
|
|
@ -80,12 +78,13 @@ func (dc *dataCoordBroker) GetSegmentInfo(ctx context.Context, ids []int64) ([]*
|
||||||
IncludeUnHealthy: true,
|
IncludeUnHealthy: true,
|
||||||
})
|
})
|
||||||
if err := merr.CheckRPCCall(infoResp, err); err != nil {
|
if err := merr.CheckRPCCall(infoResp, err); err != nil {
|
||||||
log.Warn("Fail to get SegmentInfo by ids from datacoord", zap.Error(err))
|
log.Warn("Fail to get SegmentInfo by ids from datacoord", zap.Int64s("segments", ids), zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = binlog.DecompressMultiBinLogs(infoResp.GetInfos())
|
err = binlog.DecompressMultiBinLogs(infoResp.GetInfos())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("Fail to DecompressMultiBinLogs", zap.Error(err))
|
log.Warn("Fail to DecompressMultiBinLogs", zap.Int64s("segments", ids), zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return infoResp, nil
|
return infoResp, nil
|
||||||
|
|
@ -99,6 +98,7 @@ func (dc *dataCoordBroker) GetSegmentInfo(ctx context.Context, ids []int64) ([]*
|
||||||
|
|
||||||
resp, err := getSegmentInfo(ids[startIdx:endIdx])
|
resp, err := getSegmentInfo(ids[startIdx:endIdx])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Warn("Fail to get SegmentInfo", zap.Int("total segment num", len(ids)), zap.Int("returned num", startIdx))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
ret = append(ret, resp.GetInfos()...)
|
ret = append(ret, resp.GetInfos()...)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue