mirror of https://github.com/milvus-io/milvus.git
Retry on SyncSegment failure (#25540)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/25552/head
parent
b22d6d2a5d
commit
dbfade77c5
|
@ -29,6 +29,7 @@ import (
|
||||||
"github.com/milvus-io/milvus/pkg/log"
|
"github.com/milvus-io/milvus/pkg/log"
|
||||||
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
||||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||||
|
"github.com/milvus-io/milvus/pkg/util/retry"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -164,21 +165,33 @@ func (c *SessionManager) Compaction(nodeID int64, plan *datapb.CompactionPlan) e
|
||||||
|
|
||||||
// SyncSegments is a grpc interface. It will send request to DataNode with provided `nodeID` synchronously.
|
// SyncSegments is a grpc interface. It will send request to DataNode with provided `nodeID` synchronously.
|
||||||
func (c *SessionManager) SyncSegments(nodeID int64, req *datapb.SyncSegmentsRequest) error {
|
func (c *SessionManager) SyncSegments(nodeID int64, req *datapb.SyncSegmentsRequest) error {
|
||||||
|
log := log.With(
|
||||||
|
zap.Int64("nodeID", nodeID),
|
||||||
|
zap.Int64("planID", req.GetPlanID()),
|
||||||
|
)
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), rpcCompactionTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), rpcCompactionTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
cli, err := c.getClient(ctx, nodeID)
|
cli, err := c.getClient(ctx, nodeID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("failed to get client", zap.Int64("nodeID", nodeID), zap.Error(err))
|
log.Warn("failed to get client", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := cli.SyncSegments(ctx, req)
|
err = retry.Do(context.Background(), func() error {
|
||||||
if err := VerifyResponse(resp, err); err != nil {
|
resp, err := cli.SyncSegments(ctx, req)
|
||||||
log.Warn("failed to sync segments", zap.Int64("node", nodeID), zap.Error(err), zap.Int64("planID", req.GetPlanID()))
|
if err := VerifyResponse(resp, err); err != nil {
|
||||||
|
log.Warn("failed to sync segments", zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("failed to sync segments after retry", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("success to sync segments", zap.Int64("node", nodeID), zap.Any("planID", req.GetPlanID()))
|
log.Info("success to sync segments")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -926,7 +926,7 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet
|
||||||
log.Warn("failed to SaveBinlogPaths",
|
log.Warn("failed to SaveBinlogPaths",
|
||||||
zap.Int64("segment ID", pack.segmentID),
|
zap.Int64("segment ID", pack.segmentID),
|
||||||
zap.Error(errors.New(rsp.GetReason())))
|
zap.Error(errors.New(rsp.GetReason())))
|
||||||
return fmt.Errorf("segment %d not found", pack.segmentID)
|
return nil
|
||||||
}
|
}
|
||||||
// meta error, datanode handles a virtual channel does not belong here
|
// meta error, datanode handles a virtual channel does not belong here
|
||||||
if rsp.GetErrorCode() == commonpb.ErrorCode_MetaFailed {
|
if rsp.GetErrorCode() == commonpb.ErrorCode_MetaFailed {
|
||||||
|
|
|
@ -698,10 +698,11 @@ func TestFlushNotifyFunc(t *testing.T) {
|
||||||
notifyFunc(&segmentFlushPack{})
|
notifyFunc(&segmentFlushPack{})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
t.Run("normal segment not found", func(t *testing.T) {
|
|
||||||
|
t.Run("stale segment not found", func(t *testing.T) {
|
||||||
dataCoord.SaveBinlogPathStatus = commonpb.ErrorCode_SegmentNotFound
|
dataCoord.SaveBinlogPathStatus = commonpb.ErrorCode_SegmentNotFound
|
||||||
assert.Panics(t, func() {
|
assert.NotPanics(t, func() {
|
||||||
notifyFunc(&segmentFlushPack{flushed: true})
|
notifyFunc(&segmentFlushPack{flushed: false})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue