diff --git a/internal/datacoord/session_manager.go b/internal/datacoord/session_manager.go index 8c33324ee5..73ba0a058a 100644 --- a/internal/datacoord/session_manager.go +++ b/internal/datacoord/session_manager.go @@ -29,6 +29,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/retry" "go.uber.org/zap" ) @@ -164,21 +165,33 @@ func (c *SessionManager) Compaction(nodeID int64, plan *datapb.CompactionPlan) e // SyncSegments is a grpc interface. It will send request to DataNode with provided `nodeID` synchronously. func (c *SessionManager) SyncSegments(nodeID int64, req *datapb.SyncSegmentsRequest) error { + log := log.With( + zap.Int64("nodeID", nodeID), + zap.Int64("planID", req.GetPlanID()), + ) ctx, cancel := context.WithTimeout(context.Background(), rpcCompactionTimeout) defer cancel() cli, err := c.getClient(ctx, nodeID) if err != nil { - log.Warn("failed to get client", zap.Int64("nodeID", nodeID), zap.Error(err)) + log.Warn("failed to get client", zap.Error(err)) return err } - resp, err := cli.SyncSegments(ctx, req) - if err := VerifyResponse(resp, err); err != nil { - log.Warn("failed to sync segments", zap.Int64("node", nodeID), zap.Error(err), zap.Int64("planID", req.GetPlanID())) + err = retry.Do(context.Background(), func() error { + resp, err := cli.SyncSegments(ctx, req) + if err := VerifyResponse(resp, err); err != nil { + log.Warn("failed to sync segments", zap.Error(err)) + return err + } + return nil + }) + + if err != nil { + log.Warn("failed to sync segments after retry", zap.Error(err)) return err } - log.Info("success to sync segments", zap.Int64("node", nodeID), zap.Any("planID", req.GetPlanID())) + log.Info("success to sync segments") return nil } diff --git a/internal/datanode/flush_manager.go b/internal/datanode/flush_manager.go index 7ee017d7f7..a3c945a958 100644 --- a/internal/datanode/flush_manager.go +++ b/internal/datanode/flush_manager.go @@ -926,7 +926,7 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet log.Warn("failed to SaveBinlogPaths", zap.Int64("segment ID", pack.segmentID), zap.Error(errors.New(rsp.GetReason()))) - return fmt.Errorf("segment %d not found", pack.segmentID) + return nil } // meta error, datanode handles a virtual channel does not belong here if rsp.GetErrorCode() == commonpb.ErrorCode_MetaFailed { diff --git a/internal/datanode/flush_manager_test.go b/internal/datanode/flush_manager_test.go index f73da12fdb..9398ec1d31 100644 --- a/internal/datanode/flush_manager_test.go +++ b/internal/datanode/flush_manager_test.go @@ -698,10 +698,11 @@ func TestFlushNotifyFunc(t *testing.T) { notifyFunc(&segmentFlushPack{}) }) }) - t.Run("normal segment not found", func(t *testing.T) { + + t.Run("stale segment not found", func(t *testing.T) { dataCoord.SaveBinlogPathStatus = commonpb.ErrorCode_SegmentNotFound - assert.Panics(t, func() { - notifyFunc(&segmentFlushPack{flushed: true}) + assert.NotPanics(t, func() { + notifyFunc(&segmentFlushPack{flushed: false}) }) })