mirror of https://github.com/milvus-io/milvus.git
fix: serializer shall bypass L0 segment merge stats step (#29636)
See also #27675 Fix logic problem introduced by #29413, which is serializer tries to merge statslog list while level segments do not have statslog. This shall result returning error. `writeBufferBase` ignores this error but it shall only ignore `ErrSegmentNotFound`. This PR add logic checking segment level before execution of merging statslog list. And add error type check for getSyncTask failure. Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/29688/head
parent
aa967de0a8
commit
79c06c5e73
|
@ -0,0 +1,91 @@
|
|||
// Code generated by mockery v2.32.4. DO NOT EDIT.
|
||||
|
||||
package syncmgr
|
||||
|
||||
import (
|
||||
context "context"
|
||||
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
// MockSerializer is an autogenerated mock type for the Serializer type
|
||||
type MockSerializer struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
type MockSerializer_Expecter struct {
|
||||
mock *mock.Mock
|
||||
}
|
||||
|
||||
func (_m *MockSerializer) EXPECT() *MockSerializer_Expecter {
|
||||
return &MockSerializer_Expecter{mock: &_m.Mock}
|
||||
}
|
||||
|
||||
// EncodeBuffer provides a mock function with given fields: ctx, pack
|
||||
func (_m *MockSerializer) EncodeBuffer(ctx context.Context, pack *SyncPack) (Task, error) {
|
||||
ret := _m.Called(ctx, pack)
|
||||
|
||||
var r0 Task
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *SyncPack) (Task, error)); ok {
|
||||
return rf(ctx, pack)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *SyncPack) Task); ok {
|
||||
r0 = rf(ctx, pack)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(Task)
|
||||
}
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func(context.Context, *SyncPack) error); ok {
|
||||
r1 = rf(ctx, pack)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// MockSerializer_EncodeBuffer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'EncodeBuffer'
|
||||
type MockSerializer_EncodeBuffer_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// EncodeBuffer is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - pack *SyncPack
|
||||
func (_e *MockSerializer_Expecter) EncodeBuffer(ctx interface{}, pack interface{}) *MockSerializer_EncodeBuffer_Call {
|
||||
return &MockSerializer_EncodeBuffer_Call{Call: _e.mock.On("EncodeBuffer", ctx, pack)}
|
||||
}
|
||||
|
||||
func (_c *MockSerializer_EncodeBuffer_Call) Run(run func(ctx context.Context, pack *SyncPack)) *MockSerializer_EncodeBuffer_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(context.Context), args[1].(*SyncPack))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockSerializer_EncodeBuffer_Call) Return(_a0 Task, _a1 error) *MockSerializer_EncodeBuffer_Call {
|
||||
_c.Call.Return(_a0, _a1)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockSerializer_EncodeBuffer_Call) RunAndReturn(run func(context.Context, *SyncPack) (Task, error)) *MockSerializer_EncodeBuffer_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// NewMockSerializer creates a new instance of MockSerializer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
// The first argument is typically a *testing.T value.
|
||||
func NewMockSerializer(t interface {
|
||||
mock.TestingT
|
||||
Cleanup(func())
|
||||
}) *MockSerializer {
|
||||
mock := &MockSerializer{}
|
||||
mock.Mock.Test(t)
|
||||
|
||||
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||
|
||||
return mock
|
||||
}
|
|
@ -26,6 +26,7 @@ import (
|
|||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/datanode/metacache"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/etcdpb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
|
@ -107,13 +108,15 @@ func (s *storageV1Serializer) EncodeBuffer(ctx context.Context, pack *SyncPack)
|
|||
}
|
||||
|
||||
if pack.isFlush {
|
||||
if pack.level != datapb.SegmentLevel_L0 {
|
||||
mergedStatsBlob, err := s.serializeMergedPkStats(pack)
|
||||
if err != nil {
|
||||
log.Warn("failed to serialize merged stats log", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
task.mergedStatsBlob = mergedStatsBlob
|
||||
}
|
||||
|
||||
task.WithFlush()
|
||||
}
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@ import (
|
|||
"github.com/milvus-io/milvus-storage/go/storage/options"
|
||||
"github.com/milvus-io/milvus-storage/go/storage/schema"
|
||||
"github.com/milvus-io/milvus/internal/datanode/metacache"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
iTypeutil "github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
|
@ -101,6 +102,7 @@ func (s *storageV2Serializer) EncodeBuffer(ctx context.Context, pack *SyncPack)
|
|||
}
|
||||
|
||||
if pack.isFlush {
|
||||
if pack.level != datapb.SegmentLevel_L0 {
|
||||
mergedStatsBlob, err := s.serializeMergedPkStats(pack)
|
||||
if err != nil {
|
||||
log.Warn("failed to serialize merged stats log", zap.Error(err))
|
||||
|
@ -108,6 +110,7 @@ func (s *storageV2Serializer) EncodeBuffer(ctx context.Context, pack *SyncPack)
|
|||
}
|
||||
|
||||
task.mergedStatsBlob = mergedStatsBlob
|
||||
}
|
||||
task.WithFlush()
|
||||
}
|
||||
|
||||
|
|
|
@ -211,7 +211,7 @@ func (s *BFWriteBufferSuite) TestAutoSync() {
|
|||
|
||||
func (s *BFWriteBufferSuite) TestBufferDataWithStorageV2() {
|
||||
params.Params.CommonCfg.EnableStorageV2.SwapTempValue("true")
|
||||
defer params.Params.Reset(params.Params.CommonCfg.EnableStorageV2.Key)
|
||||
defer paramtable.Get().CommonCfg.EnableStorageV2.SwapTempValue("false")
|
||||
params.Params.CommonCfg.StorageScheme.SwapTempValue("file")
|
||||
tmpDir := s.T().TempDir()
|
||||
arrowSchema, err := typeutil.ConvertToArrowSchema(s.collSchema.Fields)
|
||||
|
@ -241,6 +241,7 @@ func (s *BFWriteBufferSuite) TestBufferDataWithStorageV2() {
|
|||
|
||||
func (s *BFWriteBufferSuite) TestAutoSyncWithStorageV2() {
|
||||
params.Params.CommonCfg.EnableStorageV2.SwapTempValue("true")
|
||||
defer paramtable.Get().CommonCfg.EnableStorageV2.SwapTempValue("false")
|
||||
paramtable.Get().Save(paramtable.Get().DataNodeCfg.FlushInsertBufferSize.Key, "1")
|
||||
tmpDir := s.T().TempDir()
|
||||
arrowSchema, err := typeutil.ConvertToArrowSchema(s.collSchema.Fields)
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
|
@ -253,13 +254,16 @@ func (wb *writeBufferBase) flushSegments(ctx context.Context, segmentIDs []int64
|
|||
}
|
||||
|
||||
func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) {
|
||||
log := log.Ctx(ctx)
|
||||
for _, segmentID := range segmentIDs {
|
||||
syncTask, err := wb.getSyncTask(ctx, segmentID)
|
||||
if err != nil {
|
||||
// TODO check err type
|
||||
// segment info not found
|
||||
log.Ctx(ctx).Warn("segment not found in meta", zap.Int64("segmentID", segmentID))
|
||||
if errors.Is(err, merr.ErrSegmentNotFound) {
|
||||
log.Warn("segment not found in meta", zap.Int64("segmentID", segmentID))
|
||||
continue
|
||||
} else {
|
||||
log.Fatal("failed to get sync task", zap.Int64("segmentID", segmentID), zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
// discard Future here, handle error in callback
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
)
|
||||
|
||||
|
@ -263,6 +264,39 @@ func (s *WriteBufferSuite) TestGetCheckpoint() {
|
|||
})
|
||||
}
|
||||
|
||||
func (s *WriteBufferSuite) TestSyncSegmentsError() {
|
||||
wb, err := newWriteBufferBase(s.channelName, s.metacache, s.storageCache, s.syncMgr, &writeBufferOption{
|
||||
pkStatsFactory: func(vchannel *datapb.SegmentInfo) *metacache.BloomFilterSet {
|
||||
return metacache.NewBloomFilterSet()
|
||||
},
|
||||
})
|
||||
s.Require().NoError(err)
|
||||
|
||||
serializer := syncmgr.NewMockSerializer(s.T())
|
||||
|
||||
wb.serializer = serializer
|
||||
|
||||
segment := metacache.NewSegmentInfo(&datapb.SegmentInfo{
|
||||
ID: 1,
|
||||
}, nil)
|
||||
s.metacache.EXPECT().GetSegmentByID(int64(1)).Return(segment, true)
|
||||
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
|
||||
|
||||
s.Run("segment_not_found", func() {
|
||||
serializer.EXPECT().EncodeBuffer(mock.Anything, mock.Anything).Return(nil, merr.WrapErrSegmentNotFound(1)).Once()
|
||||
s.NotPanics(func() {
|
||||
wb.syncSegments(context.Background(), []int64{1})
|
||||
})
|
||||
})
|
||||
|
||||
s.Run("other_err", func() {
|
||||
serializer.EXPECT().EncodeBuffer(mock.Anything, mock.Anything).Return(nil, merr.WrapErrServiceInternal("mocked")).Once()
|
||||
s.Panics(func() {
|
||||
wb.syncSegments(context.Background(), []int64{1})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestWriteBufferBase(t *testing.T) {
|
||||
suite.Run(t, new(WriteBufferSuite))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue