diff --git a/internal/datanode/syncmgr/mock_serializer.go b/internal/datanode/syncmgr/mock_serializer.go new file mode 100644 index 0000000000..fdbf823699 --- /dev/null +++ b/internal/datanode/syncmgr/mock_serializer.go @@ -0,0 +1,91 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package syncmgr + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" +) + +// MockSerializer is an autogenerated mock type for the Serializer type +type MockSerializer struct { + mock.Mock +} + +type MockSerializer_Expecter struct { + mock *mock.Mock +} + +func (_m *MockSerializer) EXPECT() *MockSerializer_Expecter { + return &MockSerializer_Expecter{mock: &_m.Mock} +} + +// EncodeBuffer provides a mock function with given fields: ctx, pack +func (_m *MockSerializer) EncodeBuffer(ctx context.Context, pack *SyncPack) (Task, error) { + ret := _m.Called(ctx, pack) + + var r0 Task + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *SyncPack) (Task, error)); ok { + return rf(ctx, pack) + } + if rf, ok := ret.Get(0).(func(context.Context, *SyncPack) Task); ok { + r0 = rf(ctx, pack) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(Task) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *SyncPack) error); ok { + r1 = rf(ctx, pack) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockSerializer_EncodeBuffer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'EncodeBuffer' +type MockSerializer_EncodeBuffer_Call struct { + *mock.Call +} + +// EncodeBuffer is a helper method to define mock.On call +// - ctx context.Context +// - pack *SyncPack +func (_e *MockSerializer_Expecter) EncodeBuffer(ctx interface{}, pack interface{}) *MockSerializer_EncodeBuffer_Call { + return &MockSerializer_EncodeBuffer_Call{Call: _e.mock.On("EncodeBuffer", ctx, pack)} +} + +func (_c *MockSerializer_EncodeBuffer_Call) Run(run func(ctx context.Context, pack *SyncPack)) *MockSerializer_EncodeBuffer_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*SyncPack)) + }) + return _c +} + +func (_c *MockSerializer_EncodeBuffer_Call) Return(_a0 Task, _a1 error) *MockSerializer_EncodeBuffer_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockSerializer_EncodeBuffer_Call) RunAndReturn(run func(context.Context, *SyncPack) (Task, error)) *MockSerializer_EncodeBuffer_Call { + _c.Call.Return(run) + return _c +} + +// NewMockSerializer creates a new instance of MockSerializer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockSerializer(t interface { + mock.TestingT + Cleanup(func()) +}) *MockSerializer { + mock := &MockSerializer{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/datanode/syncmgr/storage_serializer.go b/internal/datanode/syncmgr/storage_serializer.go index 0bc9f109b1..da26dcbf50 100644 --- a/internal/datanode/syncmgr/storage_serializer.go +++ b/internal/datanode/syncmgr/storage_serializer.go @@ -26,6 +26,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/log" @@ -107,13 +108,15 @@ func (s *storageV1Serializer) EncodeBuffer(ctx context.Context, pack *SyncPack) } if pack.isFlush { - mergedStatsBlob, err := s.serializeMergedPkStats(pack) - if err != nil { - log.Warn("failed to serialize merged stats log", zap.Error(err)) - return nil, err + if pack.level != datapb.SegmentLevel_L0 { + mergedStatsBlob, err := s.serializeMergedPkStats(pack) + if err != nil { + log.Warn("failed to serialize merged stats log", zap.Error(err)) + return nil, err + } + task.mergedStatsBlob = mergedStatsBlob } - task.mergedStatsBlob = mergedStatsBlob task.WithFlush() } diff --git a/internal/datanode/syncmgr/storage_v2_serializer.go b/internal/datanode/syncmgr/storage_v2_serializer.go index 36cecea830..6b99ca4521 100644 --- a/internal/datanode/syncmgr/storage_v2_serializer.go +++ b/internal/datanode/syncmgr/storage_v2_serializer.go @@ -30,6 +30,7 @@ import ( "github.com/milvus-io/milvus-storage/go/storage/options" "github.com/milvus-io/milvus-storage/go/storage/schema" "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/storage" iTypeutil "github.com/milvus-io/milvus/internal/util/typeutil" @@ -101,13 +102,15 @@ func (s *storageV2Serializer) EncodeBuffer(ctx context.Context, pack *SyncPack) } if pack.isFlush { - mergedStatsBlob, err := s.serializeMergedPkStats(pack) - if err != nil { - log.Warn("failed to serialize merged stats log", zap.Error(err)) - return nil, err - } + if pack.level != datapb.SegmentLevel_L0 { + mergedStatsBlob, err := s.serializeMergedPkStats(pack) + if err != nil { + log.Warn("failed to serialize merged stats log", zap.Error(err)) + return nil, err + } - task.mergedStatsBlob = mergedStatsBlob + task.mergedStatsBlob = mergedStatsBlob + } task.WithFlush() } diff --git a/internal/datanode/writebuffer/bf_write_buffer_test.go b/internal/datanode/writebuffer/bf_write_buffer_test.go index 3dcd0b9063..c57c32b9bb 100644 --- a/internal/datanode/writebuffer/bf_write_buffer_test.go +++ b/internal/datanode/writebuffer/bf_write_buffer_test.go @@ -211,7 +211,7 @@ func (s *BFWriteBufferSuite) TestAutoSync() { func (s *BFWriteBufferSuite) TestBufferDataWithStorageV2() { params.Params.CommonCfg.EnableStorageV2.SwapTempValue("true") - defer params.Params.Reset(params.Params.CommonCfg.EnableStorageV2.Key) + defer paramtable.Get().CommonCfg.EnableStorageV2.SwapTempValue("false") params.Params.CommonCfg.StorageScheme.SwapTempValue("file") tmpDir := s.T().TempDir() arrowSchema, err := typeutil.ConvertToArrowSchema(s.collSchema.Fields) @@ -241,6 +241,7 @@ func (s *BFWriteBufferSuite) TestBufferDataWithStorageV2() { func (s *BFWriteBufferSuite) TestAutoSyncWithStorageV2() { params.Params.CommonCfg.EnableStorageV2.SwapTempValue("true") + defer paramtable.Get().CommonCfg.EnableStorageV2.SwapTempValue("false") paramtable.Get().Save(paramtable.Get().DataNodeCfg.FlushInsertBufferSize.Key, "1") tmpDir := s.T().TempDir() arrowSchema, err := typeutil.ConvertToArrowSchema(s.collSchema.Fields) diff --git a/internal/datanode/writebuffer/write_buffer.go b/internal/datanode/writebuffer/write_buffer.go index 300c26e69f..f1753f1ee3 100644 --- a/internal/datanode/writebuffer/write_buffer.go +++ b/internal/datanode/writebuffer/write_buffer.go @@ -5,6 +5,7 @@ import ( "fmt" "sync" + "github.com/cockroachdb/errors" "github.com/samber/lo" "go.uber.org/atomic" "go.uber.org/zap" @@ -253,13 +254,16 @@ func (wb *writeBufferBase) flushSegments(ctx context.Context, segmentIDs []int64 } func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) { + log := log.Ctx(ctx) for _, segmentID := range segmentIDs { syncTask, err := wb.getSyncTask(ctx, segmentID) if err != nil { - // TODO check err type - // segment info not found - log.Ctx(ctx).Warn("segment not found in meta", zap.Int64("segmentID", segmentID)) - continue + if errors.Is(err, merr.ErrSegmentNotFound) { + log.Warn("segment not found in meta", zap.Int64("segmentID", segmentID)) + continue + } else { + log.Fatal("failed to get sync task", zap.Int64("segmentID", segmentID), zap.Error(err)) + } } // discard Future here, handle error in callback diff --git a/internal/datanode/writebuffer/write_buffer_test.go b/internal/datanode/writebuffer/write_buffer_test.go index 8244eeb36f..fa4abd18f5 100644 --- a/internal/datanode/writebuffer/write_buffer_test.go +++ b/internal/datanode/writebuffer/write_buffer_test.go @@ -15,6 +15,7 @@ import ( "github.com/milvus-io/milvus/internal/datanode/syncmgr" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -263,6 +264,39 @@ func (s *WriteBufferSuite) TestGetCheckpoint() { }) } +func (s *WriteBufferSuite) TestSyncSegmentsError() { + wb, err := newWriteBufferBase(s.channelName, s.metacache, s.storageCache, s.syncMgr, &writeBufferOption{ + pkStatsFactory: func(vchannel *datapb.SegmentInfo) *metacache.BloomFilterSet { + return metacache.NewBloomFilterSet() + }, + }) + s.Require().NoError(err) + + serializer := syncmgr.NewMockSerializer(s.T()) + + wb.serializer = serializer + + segment := metacache.NewSegmentInfo(&datapb.SegmentInfo{ + ID: 1, + }, nil) + s.metacache.EXPECT().GetSegmentByID(int64(1)).Return(segment, true) + s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() + + s.Run("segment_not_found", func() { + serializer.EXPECT().EncodeBuffer(mock.Anything, mock.Anything).Return(nil, merr.WrapErrSegmentNotFound(1)).Once() + s.NotPanics(func() { + wb.syncSegments(context.Background(), []int64{1}) + }) + }) + + s.Run("other_err", func() { + serializer.EXPECT().EncodeBuffer(mock.Anything, mock.Anything).Return(nil, merr.WrapErrServiceInternal("mocked")).Once() + s.Panics(func() { + wb.syncSegments(context.Background(), []int64{1}) + }) + }) +} + func TestWriteBufferBase(t *testing.T) { suite.Run(t, new(WriteBufferSuite)) }