enhance: make serialization be part of sync task to support file format change (#38946)

See #38945

---------

Signed-off-by: Ted Xu <ted.xu@zilliz.com>
pull/39548/head
Ted Xu 2025-01-23 15:49:05 +08:00 committed by GitHub
parent 176ef631bc
commit 56659bacbb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 839 additions and 924 deletions

View File

@ -499,7 +499,7 @@ generate-mockery-flushcommon: getdeps
$(INSTALL_PATH)/mockery --name=MetaCache --dir=$(PWD)/internal/flushcommon/metacache --output=$(PWD)/internal/flushcommon/metacache --filename=mock_meta_cache.go --with-expecter --structname=MockMetaCache --outpkg=metacache --inpackage
$(INSTALL_PATH)/mockery --name=SyncManager --dir=$(PWD)/internal/flushcommon/syncmgr --output=$(PWD)/internal/flushcommon/syncmgr --filename=mock_sync_manager.go --with-expecter --structname=MockSyncManager --outpkg=syncmgr --inpackage
$(INSTALL_PATH)/mockery --name=MetaWriter --dir=$(PWD)/internal/flushcommon/syncmgr --output=$(PWD)/internal/flushcommon/syncmgr --filename=mock_meta_writer.go --with-expecter --structname=MockMetaWriter --outpkg=syncmgr --inpackage
$(INSTALL_PATH)/mockery --name=Serializer --dir=$(PWD)/internal/flushcommon/syncmgr --output=$(PWD)/internal/flushcommon/syncmgr --filename=mock_serializer.go --with-expecter --structname=MockSerializer --outpkg=syncmgr --inpackage
$(INSTALL_PATH)/mockery --name=PackWriter --dir=$(PWD)/internal/flushcommon/syncmgr --output=$(PWD)/internal/flushcommon/syncmgr --filename=mock_pack_writer.go --with-expecter --structname=MockPackWriter --outpkg=syncmgr --inpackage
$(INSTALL_PATH)/mockery --name=Task --dir=$(PWD)/internal/flushcommon/syncmgr --output=$(PWD)/internal/flushcommon/syncmgr --filename=mock_task.go --with-expecter --structname=MockTask --outpkg=syncmgr --inpackage
$(INSTALL_PATH)/mockery --name=WriteBuffer --dir=$(PWD)/internal/flushcommon/writebuffer --output=$(PWD)/internal/flushcommon/writebuffer --filename=mock_write_buffer.go --with-expecter --structname=MockWriteBuffer --outpkg=writebuffer --inpackage
$(INSTALL_PATH)/mockery --name=BufferManager --dir=$(PWD)/internal/flushcommon/writebuffer --output=$(PWD)/internal/flushcommon/writebuffer --filename=mock_manager.go --with-expecter --structname=MockBufferManager --outpkg=writebuffer --inpackage

View File

@ -138,7 +138,7 @@ func (s *L0ImportSuite) TestL0Import() {
task.(*syncmgr.SyncTask).WithAllocator(alloc)
s.cm.(*mocks.ChunkManager).EXPECT().RootPath().Return("mock-rootpath")
s.cm.(*mocks.ChunkManager).EXPECT().MultiWrite(mock.Anything, mock.Anything).Return(nil)
s.cm.(*mocks.ChunkManager).EXPECT().Write(mock.Anything, mock.Anything, mock.Anything).Return(nil)
task.(*syncmgr.SyncTask).WithChunkManager(s.cm)
err := task.Run(context.Background())

View File

@ -69,17 +69,6 @@ func NewSyncTask(ctx context.Context,
}, metacache.NewBM25StatsFactory)
}
var serializer syncmgr.Serializer
var err error
serializer, err = syncmgr.NewStorageSerializer(
allocator,
metaCache,
nil,
)
if err != nil {
return nil, err
}
segmentLevel := datapb.SegmentLevel_L1
if insertData == nil && deleteData != nil {
segmentLevel = datapb.SegmentLevel_L0
@ -100,7 +89,8 @@ func NewSyncTask(ctx context.Context,
syncPack.WithBM25Stats(bm25Stats)
}
return serializer.EncodeBuffer(ctx, syncPack)
task := syncmgr.NewSyncTask().WithAllocator(allocator).WithMetaCache(metaCache).WithSyncPack(syncPack)
return task, nil
}
func NewImportSegmentInfo(syncTask syncmgr.Task, metaCaches map[string]metacache.MetaCache) (*datapb.ImportSegmentInfo, error) {

View File

@ -46,7 +46,7 @@ func (b *brokerMetaWriter) UpdateSync(ctx context.Context, pack *SyncTask) error
insertFieldBinlogs := lo.MapToSlice(pack.insertBinlogs, func(_ int64, fieldBinlog *datapb.FieldBinlog) *datapb.FieldBinlog { return fieldBinlog })
statsFieldBinlogs := lo.MapToSlice(pack.statsBinlogs, func(_ int64, fieldBinlog *datapb.FieldBinlog) *datapb.FieldBinlog { return fieldBinlog })
if len(pack.deltaBinlog.Binlogs) > 0 {
if pack.deltaBinlog != nil && len(pack.deltaBinlog.Binlogs) > 0 {
deltaFieldBinlogs = append(deltaFieldBinlogs, pack.deltaBinlog)
}
@ -102,8 +102,8 @@ func (b *brokerMetaWriter) UpdateSync(ctx context.Context, pack *SyncTask) error
CheckPoints: checkPoints,
StartPositions: startPos,
Flushed: pack.isFlush,
Dropped: pack.isDrop,
Flushed: pack.pack.isFlush,
Dropped: pack.pack.isDrop,
Channel: pack.channelName,
SegLevel: pack.level,
}
@ -111,7 +111,7 @@ func (b *brokerMetaWriter) UpdateSync(ctx context.Context, pack *SyncTask) error
err := b.broker.SaveBinlogPaths(ctx, req)
// Segment not found during stale segment flush. Segment might get compacted already.
// Stop retry and still proceed to the end, ignoring this error.
if !pack.isFlush && errors.Is(err, merr.ErrSegmentNotFound) {
if !pack.pack.isFlush && errors.Is(err, merr.ErrSegmentNotFound) {
log.Warn("stale segment not found, could be compacted",
zap.Int64("segmentID", pack.segmentID))
log.Warn("failed to SaveBinlogPaths",

View File

@ -46,8 +46,7 @@ func (s *MetaWriterSuite) TestNormalSave() {
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true)
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
task := NewSyncTask()
task.WithMetaCache(s.metacache)
task := NewSyncTask().WithMetaCache(s.metacache).WithSyncPack(new(SyncPack))
err := s.writer.UpdateSync(ctx, task)
s.NoError(err)
}
@ -62,8 +61,7 @@ func (s *MetaWriterSuite) TestReturnError() {
metacache.UpdateNumOfRows(1000)(seg)
s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true)
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
task := NewSyncTask()
task.WithMetaCache(s.metacache)
task := NewSyncTask().WithMetaCache(s.metacache).WithSyncPack(new(SyncPack))
err := s.writer.UpdateSync(ctx, task)
s.Error(err)
}

View File

@ -0,0 +1,130 @@
// Code generated by mockery v2.46.0. DO NOT EDIT.
package syncmgr
import (
context "context"
datapb "github.com/milvus-io/milvus/pkg/proto/datapb"
mock "github.com/stretchr/testify/mock"
)
// MockPackWriter is an autogenerated mock type for the PackWriter type
type MockPackWriter struct {
mock.Mock
}
type MockPackWriter_Expecter struct {
mock *mock.Mock
}
func (_m *MockPackWriter) EXPECT() *MockPackWriter_Expecter {
return &MockPackWriter_Expecter{mock: &_m.Mock}
}
// Write provides a mock function with given fields: ctx, pack
func (_m *MockPackWriter) Write(ctx context.Context, pack *SyncPack) ([]*datapb.Binlog, *datapb.Binlog, *datapb.Binlog, *datapb.Binlog, int64, error) {
ret := _m.Called(ctx, pack)
if len(ret) == 0 {
panic("no return value specified for Write")
}
var r0 []*datapb.Binlog
var r1 *datapb.Binlog
var r2 *datapb.Binlog
var r3 *datapb.Binlog
var r4 int64
var r5 error
if rf, ok := ret.Get(0).(func(context.Context, *SyncPack) ([]*datapb.Binlog, *datapb.Binlog, *datapb.Binlog, *datapb.Binlog, int64, error)); ok {
return rf(ctx, pack)
}
if rf, ok := ret.Get(0).(func(context.Context, *SyncPack) []*datapb.Binlog); ok {
r0 = rf(ctx, pack)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*datapb.Binlog)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *SyncPack) *datapb.Binlog); ok {
r1 = rf(ctx, pack)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(*datapb.Binlog)
}
}
if rf, ok := ret.Get(2).(func(context.Context, *SyncPack) *datapb.Binlog); ok {
r2 = rf(ctx, pack)
} else {
if ret.Get(2) != nil {
r2 = ret.Get(2).(*datapb.Binlog)
}
}
if rf, ok := ret.Get(3).(func(context.Context, *SyncPack) *datapb.Binlog); ok {
r3 = rf(ctx, pack)
} else {
if ret.Get(3) != nil {
r3 = ret.Get(3).(*datapb.Binlog)
}
}
if rf, ok := ret.Get(4).(func(context.Context, *SyncPack) int64); ok {
r4 = rf(ctx, pack)
} else {
r4 = ret.Get(4).(int64)
}
if rf, ok := ret.Get(5).(func(context.Context, *SyncPack) error); ok {
r5 = rf(ctx, pack)
} else {
r5 = ret.Error(5)
}
return r0, r1, r2, r3, r4, r5
}
// MockPackWriter_Write_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Write'
type MockPackWriter_Write_Call struct {
*mock.Call
}
// Write is a helper method to define mock.On call
// - ctx context.Context
// - pack *SyncPack
func (_e *MockPackWriter_Expecter) Write(ctx interface{}, pack interface{}) *MockPackWriter_Write_Call {
return &MockPackWriter_Write_Call{Call: _e.mock.On("Write", ctx, pack)}
}
func (_c *MockPackWriter_Write_Call) Run(run func(ctx context.Context, pack *SyncPack)) *MockPackWriter_Write_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*SyncPack))
})
return _c
}
func (_c *MockPackWriter_Write_Call) Return(inserts []*datapb.Binlog, deletes *datapb.Binlog, stats *datapb.Binlog, bm25Stats *datapb.Binlog, size int64, err error) *MockPackWriter_Write_Call {
_c.Call.Return(inserts, deletes, stats, bm25Stats, size, err)
return _c
}
func (_c *MockPackWriter_Write_Call) RunAndReturn(run func(context.Context, *SyncPack) ([]*datapb.Binlog, *datapb.Binlog, *datapb.Binlog, *datapb.Binlog, int64, error)) *MockPackWriter_Write_Call {
_c.Call.Return(run)
return _c
}
// NewMockPackWriter creates a new instance of MockPackWriter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockPackWriter(t interface {
mock.TestingT
Cleanup(func())
}) *MockPackWriter {
mock := &MockPackWriter{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -1,95 +0,0 @@
// Code generated by mockery v2.46.0. DO NOT EDIT.
package syncmgr
import (
context "context"
mock "github.com/stretchr/testify/mock"
)
// MockSerializer is an autogenerated mock type for the Serializer type
type MockSerializer struct {
mock.Mock
}
type MockSerializer_Expecter struct {
mock *mock.Mock
}
func (_m *MockSerializer) EXPECT() *MockSerializer_Expecter {
return &MockSerializer_Expecter{mock: &_m.Mock}
}
// EncodeBuffer provides a mock function with given fields: ctx, pack
func (_m *MockSerializer) EncodeBuffer(ctx context.Context, pack *SyncPack) (Task, error) {
ret := _m.Called(ctx, pack)
if len(ret) == 0 {
panic("no return value specified for EncodeBuffer")
}
var r0 Task
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *SyncPack) (Task, error)); ok {
return rf(ctx, pack)
}
if rf, ok := ret.Get(0).(func(context.Context, *SyncPack) Task); ok {
r0 = rf(ctx, pack)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(Task)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *SyncPack) error); ok {
r1 = rf(ctx, pack)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockSerializer_EncodeBuffer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'EncodeBuffer'
type MockSerializer_EncodeBuffer_Call struct {
*mock.Call
}
// EncodeBuffer is a helper method to define mock.On call
// - ctx context.Context
// - pack *SyncPack
func (_e *MockSerializer_Expecter) EncodeBuffer(ctx interface{}, pack interface{}) *MockSerializer_EncodeBuffer_Call {
return &MockSerializer_EncodeBuffer_Call{Call: _e.mock.On("EncodeBuffer", ctx, pack)}
}
func (_c *MockSerializer_EncodeBuffer_Call) Run(run func(ctx context.Context, pack *SyncPack)) *MockSerializer_EncodeBuffer_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*SyncPack))
})
return _c
}
func (_c *MockSerializer_EncodeBuffer_Call) Return(_a0 Task, _a1 error) *MockSerializer_EncodeBuffer_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockSerializer_EncodeBuffer_Call) RunAndReturn(run func(context.Context, *SyncPack) (Task, error)) *MockSerializer_EncodeBuffer_Call {
_c.Call.Return(run)
return _c
}
// NewMockSerializer creates a new instance of MockSerializer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockSerializer(t interface {
mock.TestingT
Cleanup(func())
}) *MockSerializer {
mock := &MockSerializer{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -1,28 +1,35 @@
package syncmgr
import (
"github.com/samber/lo"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
func NewSyncTask() *SyncTask {
return &SyncTask{
isFlush: false,
insertBinlogs: make(map[int64]*datapb.FieldBinlog),
statsBinlogs: make(map[int64]*datapb.FieldBinlog),
deltaBinlog: &datapb.FieldBinlog{},
bm25Binlogs: make(map[int64]*datapb.FieldBinlog),
segmentData: make(map[string][]byte),
binlogBlobs: make(map[int64]*storage.Blob),
}
return new(SyncTask)
}
func (t *SyncTask) WithSyncPack(pack *SyncPack) *SyncTask {
t.pack = pack
// legacy code, remove later
t.collectionID = t.pack.collectionID
t.partitionID = t.pack.partitionID
t.channelName = t.pack.channelName
t.segmentID = t.pack.segmentID
t.batchRows = t.pack.batchRows
// t.metacache = t.pack.metacache
// t.schema = t.metacache.Schema()
t.startPosition = t.pack.startPosition
t.checkpoint = t.pack.checkpoint
t.level = t.pack.level
t.dataSource = t.pack.dataSource
t.tsFrom = t.pack.tsFrom
t.tsTo = t.pack.tsTo
t.failureCallback = t.pack.errHandler
return t
}
func (t *SyncTask) WithChunkManager(cm storage.ChunkManager) *SyncTask {
@ -35,56 +42,8 @@ func (t *SyncTask) WithAllocator(allocator allocator.Interface) *SyncTask {
return t
}
func (t *SyncTask) WithStartPosition(start *msgpb.MsgPosition) *SyncTask {
t.startPosition = start
return t
}
func (t *SyncTask) WithCheckpoint(cp *msgpb.MsgPosition) *SyncTask {
t.checkpoint = cp
return t
}
func (t *SyncTask) WithCollectionID(collID int64) *SyncTask {
t.collectionID = collID
return t
}
func (t *SyncTask) WithPartitionID(partID int64) *SyncTask {
t.partitionID = partID
return t
}
func (t *SyncTask) WithSegmentID(segID int64) *SyncTask {
t.segmentID = segID
return t
}
func (t *SyncTask) WithChannelName(chanName string) *SyncTask {
t.channelName = chanName
return t
}
func (t *SyncTask) WithSchema(schema *schemapb.CollectionSchema) *SyncTask {
t.schema = schema
t.pkField = lo.FindOrElse(schema.GetFields(), nil, func(field *schemapb.FieldSchema) bool {
return field.GetIsPrimaryKey()
})
return t
}
func (t *SyncTask) WithTimeRange(from, to typeutil.Timestamp) *SyncTask {
t.tsFrom, t.tsTo = from, to
return t
}
func (t *SyncTask) WithFlush() *SyncTask {
t.isFlush = true
return t
}
func (t *SyncTask) WithDrop() *SyncTask {
t.isDrop = true
t.pack.isDrop = true
return t
}
@ -107,18 +66,3 @@ func (t *SyncTask) WithFailureCallback(callback func(error)) *SyncTask {
t.failureCallback = callback
return t
}
func (t *SyncTask) WithBatchRows(batchRows int64) *SyncTask {
t.batchRows = batchRows
return t
}
func (t *SyncTask) WithLevel(level datapb.SegmentLevel) *SyncTask {
t.level = level
return t
}
func (t *SyncTask) WithDataSource(source string) *SyncTask {
t.dataSource = source
return t
}

View File

@ -0,0 +1,318 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package syncmgr
import (
"context"
"path"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/retry"
)
type PackWriter interface {
Write(ctx context.Context, pack *SyncPack) (
inserts []*datapb.Binlog, deletes *datapb.Binlog, stats *datapb.Binlog, bm25Stats *datapb.Binlog,
size int64, err error)
}
type BulkPackWriter struct {
metaCache metacache.MetaCache
chunkManager storage.ChunkManager
allocator allocator.Interface
writeRetryOpts []retry.Option
// prefetched log ids
ids []int64
sizeWritten int64
}
func NewBulkPackWriter(metaCache metacache.MetaCache, chunkManager storage.ChunkManager,
allocator allocator.Interface, writeRetryOpts ...retry.Option,
) *BulkPackWriter {
return &BulkPackWriter{
metaCache: metaCache,
chunkManager: chunkManager,
allocator: allocator,
writeRetryOpts: writeRetryOpts,
}
}
func (bw *BulkPackWriter) Write(ctx context.Context, pack *SyncPack) (
inserts map[int64]*datapb.FieldBinlog,
deltas *datapb.FieldBinlog,
stats map[int64]*datapb.FieldBinlog,
bm25Stats map[int64]*datapb.FieldBinlog,
size int64,
err error,
) {
err = bw.prefetchIDs(pack)
if err != nil {
log.Warn("failed allocate ids for sync task", zap.Error(err))
return
}
if inserts, err = bw.writeInserts(ctx, pack); err != nil {
log.Error("failed to write insert data", zap.Error(err))
return
}
if stats, err = bw.writeStats(ctx, pack); err != nil {
log.Error("failed to process stats blob", zap.Error(err))
return
}
if deltas, err = bw.writeDelta(ctx, pack); err != nil {
log.Error("failed to process delta blob", zap.Error(err))
return
}
if bm25Stats, err = bw.writeBM25Stasts(ctx, pack); err != nil {
log.Error("failed to process bm25 stats blob", zap.Error(err))
return
}
size = bw.sizeWritten
return
}
// prefetchIDs pre-allcates ids depending on the number of blobs current task contains.
func (bw *BulkPackWriter) prefetchIDs(pack *SyncPack) error {
totalIDCount := 0
if len(pack.insertData) > 0 {
totalIDCount += len(pack.insertData[0].Data) * 2 // binlogs and statslogs
}
if pack.isFlush {
totalIDCount++ // merged stats log
}
if pack.deltaData != nil {
totalIDCount++
}
if pack.bm25Stats != nil {
totalIDCount += len(pack.bm25Stats)
if pack.isFlush {
totalIDCount++ // merged bm25 stats
}
}
if totalIDCount == 0 {
return nil
}
start, _, err := bw.allocator.Alloc(uint32(totalIDCount))
if err != nil {
return err
}
bw.ids = lo.RangeFrom(start, totalIDCount)
return nil
}
func (bw *BulkPackWriter) nextID() int64 {
if len(bw.ids) == 0 {
panic("pre-fetched ids exhausted")
}
r := bw.ids[0]
bw.ids = bw.ids[1:]
return r
}
func (bw *BulkPackWriter) writeLog(ctx context.Context, blob *storage.Blob,
root, p string, pack *SyncPack,
) (*datapb.Binlog, error) {
key := path.Join(bw.chunkManager.RootPath(), root, p)
err := retry.Do(ctx, func() error {
return bw.chunkManager.Write(ctx, key, blob.Value)
}, bw.writeRetryOpts...)
if err != nil {
return nil, err
}
size := int64(len(blob.GetValue()))
bw.sizeWritten += size
return &datapb.Binlog{
EntriesNum: blob.RowNum,
TimestampFrom: pack.tsFrom,
TimestampTo: pack.tsTo,
LogPath: key,
LogSize: size,
MemorySize: blob.MemorySize,
}, nil
}
func (bw *BulkPackWriter) writeInserts(ctx context.Context, pack *SyncPack) (map[int64]*datapb.FieldBinlog, error) {
if len(pack.insertData) == 0 {
return make(map[int64]*datapb.FieldBinlog), nil
}
serializer, err := NewStorageSerializer(bw.metaCache)
if err != nil {
return nil, err
}
binlogBlobs, err := serializer.serializeBinlog(ctx, pack)
if err != nil {
return nil, err
}
logs := make(map[int64]*datapb.FieldBinlog)
for fieldID, blob := range binlogBlobs {
k := metautil.JoinIDPath(pack.collectionID, pack.partitionID, pack.segmentID, fieldID, bw.nextID())
binlog, err := bw.writeLog(ctx, blob, common.SegmentInsertLogPath, k, pack)
if err != nil {
return nil, err
}
logs[fieldID] = &datapb.FieldBinlog{
FieldID: fieldID,
Binlogs: []*datapb.Binlog{binlog},
}
}
return logs, nil
}
func (bw *BulkPackWriter) writeStats(ctx context.Context, pack *SyncPack) (map[int64]*datapb.FieldBinlog, error) {
if len(pack.insertData) == 0 {
return make(map[int64]*datapb.FieldBinlog), nil
}
serializer, err := NewStorageSerializer(bw.metaCache)
if err != nil {
return nil, err
}
singlePKStats, batchStatsBlob, err := serializer.serializeStatslog(pack)
if err != nil {
return nil, err
}
actions := []metacache.SegmentAction{metacache.RollStats(singlePKStats)}
bw.metaCache.UpdateSegments(metacache.MergeSegmentAction(actions...), metacache.WithSegmentIDs(pack.segmentID))
pkFieldID := serializer.pkField.GetFieldID()
binlogs := make([]*datapb.Binlog, 0)
k := metautil.JoinIDPath(pack.collectionID, pack.partitionID, pack.segmentID, pkFieldID, bw.nextID())
if binlog, err := bw.writeLog(ctx, batchStatsBlob, common.SegmentStatslogPath, k, pack); err != nil {
return nil, err
} else {
binlogs = append(binlogs, binlog)
}
if pack.isFlush && pack.level != datapb.SegmentLevel_L0 {
mergedStatsBlob, err := serializer.serializeMergedPkStats(pack)
if err != nil {
return nil, err
}
k := metautil.JoinIDPath(pack.collectionID, pack.partitionID, pack.segmentID, pkFieldID, int64(storage.CompoundStatsType))
binlog, err := bw.writeLog(ctx, mergedStatsBlob, common.SegmentStatslogPath, k, pack)
if err != nil {
return nil, err
}
binlogs = append(binlogs, binlog)
}
logs := make(map[int64]*datapb.FieldBinlog)
logs[pkFieldID] = &datapb.FieldBinlog{
FieldID: pkFieldID,
Binlogs: binlogs,
}
return logs, nil
}
func (bw *BulkPackWriter) writeBM25Stasts(ctx context.Context, pack *SyncPack) (map[int64]*datapb.FieldBinlog, error) {
if len(pack.bm25Stats) == 0 {
return make(map[int64]*datapb.FieldBinlog), nil
}
serializer, err := NewStorageSerializer(bw.metaCache)
if err != nil {
return nil, err
}
bm25Blobs, err := serializer.serializeBM25Stats(pack)
if err != nil {
return nil, err
}
logs := make(map[int64]*datapb.FieldBinlog)
for fieldID, blob := range bm25Blobs {
k := metautil.JoinIDPath(pack.collectionID, pack.partitionID, pack.segmentID, fieldID, bw.nextID())
binlog, err := bw.writeLog(ctx, blob, common.SegmentBm25LogPath, k, pack)
if err != nil {
return nil, err
}
logs[fieldID] = &datapb.FieldBinlog{
FieldID: fieldID,
Binlogs: []*datapb.Binlog{binlog},
}
}
actions := []metacache.SegmentAction{metacache.MergeBm25Stats(pack.bm25Stats)}
bw.metaCache.UpdateSegments(metacache.MergeSegmentAction(actions...), metacache.WithSegmentIDs(pack.segmentID))
if pack.isFlush {
if pack.level != datapb.SegmentLevel_L0 {
if hasBM25Function(bw.metaCache.Schema()) {
mergedBM25Blob, err := serializer.serializeMergedBM25Stats(pack)
if err != nil {
return nil, err
}
for fieldID, blob := range mergedBM25Blob {
k := metautil.JoinIDPath(pack.collectionID, pack.partitionID, pack.segmentID, fieldID, int64(storage.CompoundStatsType))
binlog, err := bw.writeLog(ctx, blob, common.SegmentBm25LogPath, k, pack)
if err != nil {
return nil, err
}
fieldBinlog, ok := logs[fieldID]
if !ok {
fieldBinlog = &datapb.FieldBinlog{
FieldID: fieldID,
}
logs[fieldID] = fieldBinlog
}
fieldBinlog.Binlogs = append(fieldBinlog.Binlogs, binlog)
}
}
}
}
return logs, nil
}
func (bw *BulkPackWriter) writeDelta(ctx context.Context, pack *SyncPack) (*datapb.FieldBinlog, error) {
if pack.deltaData == nil {
return &datapb.FieldBinlog{}, nil
}
s, err := NewStorageSerializer(bw.metaCache)
if err != nil {
return nil, err
}
deltaBlob, err := s.serializeDeltalog(pack)
if err != nil {
return nil, err
}
k := metautil.JoinIDPath(pack.collectionID, pack.partitionID, pack.segmentID, bw.nextID())
deltalog, err := bw.writeLog(ctx, deltaBlob, common.SegmentDeltaLogPath, k, pack)
if err != nil {
return nil, err
}
return &datapb.FieldBinlog{
FieldID: s.pkField.GetFieldID(),
Binlogs: []*datapb.Binlog{deltalog},
}, nil
}

View File

@ -0,0 +1,190 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package syncmgr
import (
"context"
"fmt"
"reflect"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
func TestNextID(t *testing.T) {
al := allocator.NewMockGIDAllocator()
i := int64(0)
al.AllocF = func(count uint32) (int64, int64, error) {
rt := i
i += int64(count)
return rt, int64(count), nil
}
al.AllocOneF = func() (allocator.UniqueID, error) {
rt := i
i++
return rt, nil
}
bw := NewBulkPackWriter(nil, nil, al)
bw.prefetchIDs(new(SyncPack).WithFlush())
t.Run("normal_next", func(t *testing.T) {
id := bw.nextID()
assert.Equal(t, int64(0), id)
})
t.Run("id_exhausted", func(t *testing.T) {
assert.Panics(t, func() {
bw.nextID()
})
})
}
func TestBulkPackWriter_Write(t *testing.T) {
paramtable.Get().Init(paramtable.NewBaseTable())
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, nil, nil)
metacache.UpdateNumOfRows(1000)(seg)
collectionID := int64(123)
partitionID := int64(456)
segmentID := int64(789)
channelName := fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", collectionID)
schema := &schemapb.CollectionSchema{
Name: "sync_task_test_col",
Fields: []*schemapb.FieldSchema{
{FieldID: common.RowIDField, DataType: schemapb.DataType_Int64},
{FieldID: common.TimeStampField, DataType: schemapb.DataType_Int64},
{
FieldID: 100,
Name: "pk",
DataType: schemapb.DataType_Int64,
IsPrimaryKey: true,
},
{
FieldID: 101,
Name: "vector",
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
}
mc := metacache.NewMockMetaCache(t)
mc.EXPECT().Collection().Return(collectionID).Maybe()
mc.EXPECT().Schema().Return(schema).Maybe()
mc.EXPECT().GetSegmentByID(segmentID).Return(seg, true).Maybe()
mc.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}).Maybe()
mc.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Run(func(action metacache.SegmentAction, filters ...metacache.SegmentFilter) {
action(seg)
}).Return().Maybe()
cm := mocks.NewChunkManager(t)
cm.EXPECT().RootPath().Return("files").Maybe()
cm.EXPECT().Write(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
deletes := &storage.DeleteData{}
for i := 0; i < 10; i++ {
pk := storage.NewInt64PrimaryKey(int64(i + 1))
ts := tsoutil.ComposeTSByTime(time.Now(), 0)
deletes.Append(pk, ts)
}
bw := &BulkPackWriter{
metaCache: mc,
chunkManager: cm,
allocator: allocator.NewLocalAllocator(10000, 100000),
}
tests := []struct {
name string
pack *SyncPack
wantInserts map[int64]*datapb.FieldBinlog
wantDeltas *datapb.FieldBinlog
wantStats map[int64]*datapb.FieldBinlog
wantBm25Stats map[int64]*datapb.FieldBinlog
wantSize int64
wantErr error
}{
{
name: "empty",
pack: new(SyncPack).WithCollectionID(collectionID).WithPartitionID(partitionID).WithSegmentID(segmentID).WithChannelName(channelName),
wantInserts: map[int64]*datapb.FieldBinlog{},
wantDeltas: &datapb.FieldBinlog{},
wantStats: map[int64]*datapb.FieldBinlog{},
wantBm25Stats: map[int64]*datapb.FieldBinlog{},
wantSize: 0,
wantErr: nil,
},
{
name: "with delete",
pack: new(SyncPack).WithCollectionID(collectionID).WithPartitionID(partitionID).WithSegmentID(segmentID).WithChannelName(channelName).WithDeleteData(deletes),
wantInserts: map[int64]*datapb.FieldBinlog{},
wantDeltas: &datapb.FieldBinlog{
FieldID: 100,
Binlogs: []*datapb.Binlog{
{
EntriesNum: 10,
LogPath: "files/delta_log/123/456/789/10000",
LogSize: 642,
MemorySize: 433,
},
},
},
wantStats: map[int64]*datapb.FieldBinlog{},
wantBm25Stats: map[int64]*datapb.FieldBinlog{},
wantSize: 642,
wantErr: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotInserts, gotDeltas, gotStats, gotBm25Stats, gotSize, err := bw.Write(context.Background(), tt.pack)
if err != tt.wantErr {
t.Errorf("BulkPackWriter.Write() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(gotInserts, tt.wantInserts) {
t.Errorf("BulkPackWriter.Write() gotInserts = %v, want %v", gotInserts, tt.wantInserts)
}
if !reflect.DeepEqual(gotDeltas, tt.wantDeltas) {
t.Errorf("BulkPackWriter.Write() gotDeltas = %v, want %v", gotDeltas, tt.wantDeltas)
}
if !reflect.DeepEqual(gotStats, tt.wantStats) {
t.Errorf("BulkPackWriter.Write() gotStats = %v, want %v", gotStats, tt.wantStats)
}
if !reflect.DeepEqual(gotBm25Stats, tt.wantBm25Stats) {
t.Errorf("BulkPackWriter.Write() gotBm25Stats = %v, want %v", gotBm25Stats, tt.wantBm25Stats)
}
if gotSize != tt.wantSize {
t.Errorf("BulkPackWriter.Write() gotSize = %v, want %v", gotSize, tt.wantSize)
}
})
}
}

View File

@ -25,32 +25,23 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/proto/datapb"
"github.com/milvus-io/milvus/pkg/proto/etcdpb"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)
type storageV1Serializer struct {
collectionID int64
schema *schemapb.CollectionSchema
pkField *schemapb.FieldSchema
schema *schemapb.CollectionSchema
pkField *schemapb.FieldSchema
inCodec *storage.InsertCodec
allocator allocator.Interface
metacache metacache.MetaCache
metaWriter MetaWriter
metacache metacache.MetaCache
}
func NewStorageSerializer(allocator allocator.Interface, metacache metacache.MetaCache, metaWriter MetaWriter) (*storageV1Serializer, error) {
collectionID := metacache.Collection()
func NewStorageSerializer(metacache metacache.MetaCache) (*storageV1Serializer, error) {
schema := metacache.Schema()
pkField := lo.FindOrElse(schema.GetFields(), nil, func(field *schemapb.FieldSchema) bool { return field.GetIsPrimaryKey() })
if pkField == nil {
@ -58,130 +49,21 @@ func NewStorageSerializer(allocator allocator.Interface, metacache metacache.Met
}
meta := &etcdpb.CollectionMeta{
Schema: schema,
ID: collectionID,
}
inCodec := storage.NewInsertCodecWithSchema(meta)
return &storageV1Serializer{
collectionID: collectionID,
schema: schema,
pkField: pkField,
schema: schema,
pkField: pkField,
inCodec: inCodec,
allocator: allocator,
metacache: metacache,
metaWriter: metaWriter,
inCodec: inCodec,
metacache: metacache,
}, nil
}
func (s *storageV1Serializer) EncodeBuffer(ctx context.Context, pack *SyncPack) (Task, error) {
task := NewSyncTask()
tr := timerecord.NewTimeRecorder("storage_serializer")
log := log.Ctx(ctx).With(
zap.Int64("segmentID", pack.segmentID),
zap.Int64("collectionID", pack.collectionID),
zap.String("channel", pack.channelName),
)
if len(pack.insertData) > 0 {
memSize := make(map[int64]int64)
for _, chunk := range pack.insertData {
for fieldID, fieldData := range chunk.Data {
memSize[fieldID] += int64(fieldData.GetMemorySize())
}
}
task.binlogMemsize = memSize
binlogBlobs, err := s.serializeBinlog(ctx, pack)
if err != nil {
log.Warn("failed to serialize binlog", zap.Error(err))
return nil, err
}
task.binlogBlobs = binlogBlobs
actions := []metacache.SegmentAction{}
singlePKStats, batchStatsBlob, err := s.serializeStatslog(pack)
if err != nil {
log.Warn("failed to serialized statslog", zap.Error(err))
return nil, err
}
task.batchStatsBlob = batchStatsBlob
actions = append(actions, metacache.RollStats(singlePKStats))
if len(pack.bm25Stats) > 0 {
statsBlobs, err := s.serializeBM25Stats(pack)
if err != nil {
return nil, err
}
task.bm25Blobs = statsBlobs
actions = append(actions, metacache.MergeBm25Stats(pack.bm25Stats))
}
s.metacache.UpdateSegments(metacache.MergeSegmentAction(actions...), metacache.WithSegmentIDs(pack.segmentID))
}
if pack.isFlush {
if pack.level != datapb.SegmentLevel_L0 {
mergedStatsBlob, err := s.serializeMergedPkStats(pack)
if err != nil {
log.Warn("failed to serialize merged stats log", zap.Error(err))
return nil, err
}
task.mergedStatsBlob = mergedStatsBlob
if hasBM25Function(s.schema) {
mergedBM25Blob, err := s.serializeMergedBM25Stats(pack)
if err != nil {
log.Warn("failed to serialize merged bm25 stats log", zap.Error(err))
return nil, err
}
task.mergedBm25Blob = mergedBM25Blob
}
}
task.WithFlush()
}
if pack.deltaData != nil {
deltaBlob, err := s.serializeDeltalog(pack)
if err != nil {
log.Warn("failed to serialize delta log", zap.Error(err))
return nil, err
}
task.deltaBlob = deltaBlob
task.deltaRowCount = pack.deltaData.RowCount
}
if pack.isDrop {
task.WithDrop()
}
s.setTaskMeta(task, pack)
task.WithAllocator(s.allocator)
metrics.DataNodeEncodeBufferLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), pack.level.String()).Observe(float64(tr.RecordSpan().Milliseconds()))
return task, nil
}
func (s *storageV1Serializer) setTaskMeta(task *SyncTask, pack *SyncPack) {
task.WithCollectionID(pack.collectionID).
WithPartitionID(pack.partitionID).
WithChannelName(pack.channelName).
WithSegmentID(pack.segmentID).
WithBatchRows(pack.batchRows).
WithSchema(s.metacache.Schema()).
WithStartPosition(pack.startPosition).
WithCheckpoint(pack.checkpoint).
WithLevel(pack.level).
WithDataSource(pack.dataSource).
WithTimeRange(pack.tsFrom, pack.tsTo).
WithMetaCache(s.metacache).
WithMetaWriter(s.metaWriter).
WithFailureCallback(pack.errHandler)
}
func (s *storageV1Serializer) serializeBinlog(ctx context.Context, pack *SyncPack) (map[int64]*storage.Blob, error) {
if len(pack.insertData) == 0 {
return make(map[int64]*storage.Blob), nil
}
log := log.Ctx(ctx)
blobs, err := s.inCodec.Serialize(pack.partitionID, pack.segmentID, pack.insertData...)
if err != nil {
@ -202,6 +84,9 @@ func (s *storageV1Serializer) serializeBinlog(ctx context.Context, pack *SyncPac
}
func (s *storageV1Serializer) serializeBM25Stats(pack *SyncPack) (map[int64]*storage.Blob, error) {
if len(pack.bm25Stats) == 0 {
return make(map[int64]*storage.Blob), nil
}
blobs := make(map[int64]*storage.Blob)
for fieldID, stats := range pack.bm25Stats {
bytes, err := stats.Serialize()
@ -219,6 +104,9 @@ func (s *storageV1Serializer) serializeBM25Stats(pack *SyncPack) (map[int64]*sto
}
func (s *storageV1Serializer) serializeStatslog(pack *SyncPack) (*storage.PrimaryKeyStats, *storage.Blob, error) {
if len(pack.insertData) == 0 {
return nil, nil, nil
}
var rowNum int64
var pkFieldData []storage.FieldData
for _, chunk := range pack.insertData {

View File

@ -24,7 +24,6 @@ import (
"time"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -92,11 +91,10 @@ func (s *StorageV1SerializerSuite) SetupSuite() {
}
func (s *StorageV1SerializerSuite) SetupTest() {
s.mockCache.EXPECT().Collection().Return(s.collectionID)
s.mockCache.EXPECT().Schema().Return(s.schema)
var err error
s.serializer, err = NewStorageSerializer(s.mockAllocator, s.mockCache, s.mockMetaWriter)
s.serializer, err = NewStorageSerializer(s.mockCache)
s.Require().NoError(err)
}
@ -179,20 +177,8 @@ func (s *StorageV1SerializerSuite) TestSerializeInsert() {
pack.WithTimeRange(50, 100)
pack.WithDrop()
task, err := s.serializer.EncodeBuffer(ctx, pack)
_, err := s.serializer.serializeBinlog(ctx, pack)
s.NoError(err)
taskV1, ok := task.(*SyncTask)
s.Require().True(ok)
s.Equal(s.collectionID, taskV1.collectionID)
s.Equal(s.partitionID, taskV1.partitionID)
s.Equal(s.channelName, taskV1.channelName)
s.Equal(&msgpb.MsgPosition{
Timestamp: 1000,
ChannelName: s.channelName,
}, taskV1.checkpoint)
s.EqualValues(50, taskV1.tsFrom)
s.EqualValues(100, taskV1.tsTo)
s.True(taskV1.isDrop)
})
s.Run("with_empty_data", func() {
@ -200,7 +186,7 @@ func (s *StorageV1SerializerSuite) TestSerializeInsert() {
pack.WithTimeRange(50, 100)
pack.WithInsertData([]*storage.InsertData{s.getEmptyInsertBuffer()}).WithBatchRows(0)
_, err := s.serializer.EncodeBuffer(ctx, pack)
_, err := s.serializer.serializeBinlog(ctx, pack)
s.Error(err)
})
@ -209,32 +195,21 @@ func (s *StorageV1SerializerSuite) TestSerializeInsert() {
pack.WithTimeRange(50, 100)
pack.WithInsertData([]*storage.InsertData{s.getInsertBuffer()}).WithBatchRows(10)
s.mockCache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return().Once()
task, err := s.serializer.EncodeBuffer(ctx, pack)
blobs, err := s.serializer.serializeBinlog(ctx, pack)
s.NoError(err)
taskV1, ok := task.(*SyncTask)
s.Require().True(ok)
s.Equal(s.collectionID, taskV1.collectionID)
s.Equal(s.partitionID, taskV1.partitionID)
s.Equal(s.channelName, taskV1.channelName)
s.Equal(&msgpb.MsgPosition{
Timestamp: 1000,
ChannelName: s.channelName,
}, taskV1.checkpoint)
s.EqualValues(50, taskV1.tsFrom)
s.EqualValues(100, taskV1.tsTo)
s.Len(taskV1.binlogBlobs, 4)
s.NotNil(taskV1.batchStatsBlob)
s.Len(blobs, 4)
stats, blob, err := s.serializer.serializeStatslog(pack)
s.NoError(err)
s.NotNil(stats)
s.NotNil(blob)
})
s.Run("with_flush_segment_not_found", func() {
pack := s.getBasicPack()
pack.WithFlush()
pack.WithInsertData([]*storage.InsertData{s.getInsertBuffer()}).WithFlush()
s.mockCache.EXPECT().GetSegmentByID(s.segmentID).Return(nil, false).Once()
_, err := s.serializer.EncodeBuffer(ctx, pack)
_, err := s.serializer.serializeMergedPkStats(pack)
s.Error(err)
})
@ -247,63 +222,39 @@ func (s *StorageV1SerializerSuite) TestSerializeInsert() {
bfs := s.getBfs()
segInfo := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs, nil)
metacache.UpdateNumOfRows(1000)(segInfo)
s.mockCache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Run(func(action metacache.SegmentAction, filters ...metacache.SegmentFilter) {
action(segInfo)
}).Return().Once()
s.mockCache.EXPECT().GetSegmentByID(s.segmentID).Return(segInfo, true).Once()
s.mockCache.EXPECT().GetSegmentByID(s.segmentID).Return(segInfo, true)
task, err := s.serializer.EncodeBuffer(ctx, pack)
blobs, err := s.serializer.serializeBinlog(ctx, pack)
s.NoError(err)
taskV1, ok := task.(*SyncTask)
s.Require().True(ok)
s.Equal(s.collectionID, taskV1.collectionID)
s.Equal(s.partitionID, taskV1.partitionID)
s.Equal(s.channelName, taskV1.channelName)
s.Equal(&msgpb.MsgPosition{
Timestamp: 1000,
ChannelName: s.channelName,
}, taskV1.checkpoint)
s.EqualValues(50, taskV1.tsFrom)
s.EqualValues(100, taskV1.tsTo)
s.Len(taskV1.binlogBlobs, 4)
s.NotNil(taskV1.batchStatsBlob)
s.NotNil(taskV1.mergedStatsBlob)
s.Len(blobs, 4)
stats, blob, err := s.serializer.serializeStatslog(pack)
s.NoError(err)
s.NotNil(stats)
s.NotNil(blob)
action := metacache.RollStats(stats)
action(segInfo)
blob, err = s.serializer.serializeMergedPkStats(pack)
s.NoError(err)
s.NotNil(blob)
})
}
func (s *StorageV1SerializerSuite) TestSerializeDelete() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s.Run("serialize_normal", func() {
pack := s.getBasicPack()
pack.WithDeleteData(s.getDeleteBuffer())
pack.WithTimeRange(50, 100)
task, err := s.serializer.EncodeBuffer(ctx, pack)
blob, err := s.serializer.serializeDeltalog(pack)
s.NoError(err)
taskV1, ok := task.(*SyncTask)
s.Require().True(ok)
s.Equal(s.collectionID, taskV1.collectionID)
s.Equal(s.partitionID, taskV1.partitionID)
s.Equal(s.channelName, taskV1.channelName)
s.Equal(&msgpb.MsgPosition{
Timestamp: 1000,
ChannelName: s.channelName,
}, taskV1.checkpoint)
s.EqualValues(50, taskV1.tsFrom)
s.EqualValues(100, taskV1.tsTo)
s.NotNil(taskV1.deltaBlob)
s.NotNil(blob)
})
}
func (s *StorageV1SerializerSuite) TestBadSchema() {
mockCache := metacache.NewMockMetaCache(s.T())
mockCache.EXPECT().Collection().Return(s.collectionID).Once()
mockCache.EXPECT().Schema().Return(&schemapb.CollectionSchema{}).Once()
_, err := NewStorageSerializer(s.mockAllocator, mockCache, s.mockMetaWriter)
_, err := NewStorageSerializer(mockCache)
s.Error(err)
}

View File

@ -2,13 +2,11 @@ package syncmgr
import (
"context"
"math/rand"
"strconv"
"testing"
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
@ -23,13 +21,11 @@ import (
"github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle"
"github.com/milvus-io/milvus/internal/json"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/config"
"github.com/milvus-io/milvus/pkg/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
type SyncManagerSuite struct {
@ -95,59 +91,15 @@ func (s *SyncManagerSuite) SetupTest() {
s.metacache = metacache.NewMockMetaCache(s.T())
}
func (s *SyncManagerSuite) getEmptyInsertBuffer() *storage.InsertData {
buf, err := storage.NewInsertData(s.schema)
s.Require().NoError(err)
return buf
}
func (s *SyncManagerSuite) getInsertBuffer() *storage.InsertData {
buf := s.getEmptyInsertBuffer()
// generate data
for i := 0; i < 10; i++ {
data := make(map[storage.FieldID]any)
data[common.RowIDField] = int64(i + 1)
data[common.TimeStampField] = int64(i + 1)
data[100] = int64(i + 1)
vector := lo.RepeatBy(128, func(_ int) float32 {
return rand.Float32()
})
data[101] = vector
err := buf.Append(data)
s.Require().NoError(err)
}
return buf
}
func (s *SyncManagerSuite) getDeleteBuffer() *storage.DeleteData {
buf := &storage.DeleteData{}
for i := 0; i < 10; i++ {
pk := storage.NewInt64PrimaryKey(int64(i + 1))
ts := tsoutil.ComposeTSByTime(time.Now(), 0)
buf.Append(pk, ts)
}
return buf
}
func (s *SyncManagerSuite) getDeleteBufferZeroTs() *storage.DeleteData {
buf := &storage.DeleteData{}
for i := 0; i < 10; i++ {
pk := storage.NewInt64PrimaryKey(int64(i + 1))
buf.Append(pk, 0)
}
return buf
}
func (s *SyncManagerSuite) getSuiteSyncTask() *SyncTask {
task := NewSyncTask().WithCollectionID(s.collectionID).
WithPartitionID(s.partitionID).
WithSegmentID(s.segmentID).
WithChannelName(s.channelName).
WithSchema(s.schema).
WithChunkManager(s.chunkManager).
task := NewSyncTask().
WithSyncPack(new(SyncPack).
WithCollectionID(s.collectionID).
WithPartitionID(s.partitionID).
WithSegmentID(s.segmentID).
WithChannelName(s.channelName)).
WithAllocator(s.allocator).
WithChunkManager(s.chunkManager).
WithMetaCache(s.metacache).
WithAllocator(s.allocator)
@ -166,12 +118,6 @@ func (s *SyncManagerSuite) TestSubmit() {
manager := NewSyncManager(s.chunkManager)
task := s.getSuiteSyncTask()
task.WithMetaWriter(BrokerMetaWriter(s.broker, 1))
task.WithTimeRange(50, 100)
task.WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
})
f, err := manager.SyncData(context.Background(), task)
s.NoError(err)
@ -206,12 +152,6 @@ func (s *SyncManagerSuite) TestCompacted() {
manager := NewSyncManager(s.chunkManager)
task := s.getSuiteSyncTask()
task.WithMetaWriter(BrokerMetaWriter(s.broker, 1))
task.WithTimeRange(50, 100)
task.WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
})
f, err := manager.SyncData(context.Background(), task)
s.NoError(err)
@ -295,7 +235,6 @@ func (s *SyncManagerSuite) TestSyncManager_TaskStatsJSON() {
collectionID: 1,
partitionID: 1,
channelName: "channel1",
schema: &schemapb.CollectionSchema{},
checkpoint: &msgpb.MsgPosition{},
tsFrom: 1000,
tsTo: 2000,
@ -306,7 +245,6 @@ func (s *SyncManagerSuite) TestSyncManager_TaskStatsJSON() {
collectionID: 2,
partitionID: 2,
channelName: "channel2",
schema: &schemapb.CollectionSchema{},
checkpoint: &msgpb.MsgPosition{},
tsFrom: 3000,
tsTo: 4000,

View File

@ -19,25 +19,20 @@ package syncmgr
import (
"context"
"fmt"
"path"
"time"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
"github.com/milvus-io/milvus/internal/json"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
@ -50,13 +45,10 @@ type SyncTask struct {
chunkManager storage.ChunkManager
allocator allocator.Interface
segment *metacache.SegmentInfo
collectionID int64
partitionID int64
segmentID int64
channelName string
schema *schemapb.CollectionSchema
pkField *schemapb.FieldSchema
startPosition *msgpb.MsgPosition
checkpoint *msgpb.MsgPosition
dataSource string
@ -68,34 +60,16 @@ type SyncTask struct {
tsFrom typeutil.Timestamp
tsTo typeutil.Timestamp
isFlush bool
isDrop bool
metacache metacache.MetaCache
metaWriter MetaWriter
pack *SyncPack
insertBinlogs map[int64]*datapb.FieldBinlog // map[int64]*datapb.Binlog
statsBinlogs map[int64]*datapb.FieldBinlog // map[int64]*datapb.Binlog
bm25Binlogs map[int64]*datapb.FieldBinlog
deltaBinlog *datapb.FieldBinlog
binlogBlobs map[int64]*storage.Blob // fieldID => blob
binlogMemsize map[int64]int64 // memory size
bm25Blobs map[int64]*storage.Blob
mergedBm25Blob map[int64]*storage.Blob
batchStatsBlob *storage.Blob
mergedStatsBlob *storage.Blob
deltaBlob *storage.Blob
deltaRowCount int64
// prefetched log ids
ids []int64
segmentData map[string][]byte
writeRetryOpts []retry.Option
failureCallback func(err error)
@ -122,7 +96,7 @@ func (t *SyncTask) HandleError(err error) {
}
metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.FailLabel, t.level.String()).Inc()
if !t.isFlush {
if !t.pack.isFlush {
metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.FailLabel, t.level.String()).Inc()
}
}
@ -137,10 +111,9 @@ func (t *SyncTask) Run(ctx context.Context) (err error) {
}
}()
var has bool
t.segment, has = t.metacache.GetSegmentByID(t.segmentID)
_, has := t.metacache.GetSegmentByID(t.segmentID)
if !has {
if t.isDrop {
if t.pack.isDrop {
log.Info("segment dropped, discard sync task")
return nil
}
@ -149,35 +122,13 @@ func (t *SyncTask) Run(ctx context.Context) (err error) {
return err
}
err = t.prefetchIDs()
writer := NewBulkPackWriter(t.metacache, t.chunkManager, t.allocator, t.writeRetryOpts...)
t.insertBinlogs, t.deltaBinlog, t.statsBinlogs, t.bm25Binlogs, t.flushedSize, err = writer.Write(ctx, t.pack)
if err != nil {
log.Warn("failed allocate ids for sync task", zap.Error(err))
log.Warn("failed to write sync data", zap.Error(err))
return err
}
t.processInsertBlobs()
t.processStatsBlob()
t.processDeltaBlob()
if len(t.bm25Blobs) > 0 || len(t.mergedBm25Blob) > 0 {
t.processBM25StastBlob()
}
err = t.writeLogs(ctx)
if err != nil {
log.Warn("failed to save serialized data into storage", zap.Error(err))
return err
}
var totalSize int64
for _, size := range t.binlogMemsize {
totalSize += size
}
if t.deltaBlob != nil {
totalSize += int64(len(t.deltaBlob.Value))
}
t.flushedSize = totalSize
metrics.DataNodeFlushedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource, t.level.String()).Add(float64(t.flushedSize))
metrics.DataNodeFlushedRows.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource).Add(float64(t.batchRows))
@ -192,203 +143,27 @@ func (t *SyncTask) Run(ctx context.Context) (err error) {
}
actions := []metacache.SegmentAction{metacache.FinishSyncing(t.batchRows)}
if t.isFlush {
if t.pack.isFlush {
actions = append(actions, metacache.UpdateState(commonpb.SegmentState_Flushed))
}
t.metacache.UpdateSegments(metacache.MergeSegmentAction(actions...), metacache.WithSegmentIDs(t.segment.SegmentID()))
t.metacache.UpdateSegments(metacache.MergeSegmentAction(actions...), metacache.WithSegmentIDs(t.segmentID))
if t.isDrop {
t.metacache.RemoveSegments(metacache.WithSegmentIDs(t.segment.SegmentID()))
log.Info("segment removed", zap.Int64("segmentID", t.segment.SegmentID()), zap.String("channel", t.channelName))
if t.pack.isDrop {
t.metacache.RemoveSegments(metacache.WithSegmentIDs(t.segmentID))
log.Info("segment removed", zap.Int64("segmentID", t.segmentID), zap.String("channel", t.channelName))
}
t.execTime = t.tr.ElapseSpan()
log.Info("task done", zap.Int64("flushedSize", totalSize), zap.Duration("timeTaken", t.execTime))
log.Info("task done", zap.Int64("flushedSize", t.flushedSize), zap.Duration("timeTaken", t.execTime))
if !t.isFlush {
if !t.pack.isFlush {
metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SuccessLabel, t.level.String()).Inc()
}
metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SuccessLabel, t.level.String()).Inc()
// free blobs and data
t.binlogBlobs = nil
t.deltaBlob = nil
t.mergedStatsBlob = nil
t.batchStatsBlob = nil
t.segmentData = nil
return nil
}
// prefetchIDs pre-allcates ids depending on the number of blobs current task contains.
func (t *SyncTask) prefetchIDs() error {
totalIDCount := len(t.binlogBlobs)
if t.batchStatsBlob != nil {
totalIDCount++
}
if t.deltaBlob != nil {
totalIDCount++
}
if t.bm25Blobs != nil {
totalIDCount += len(t.bm25Blobs)
}
start, _, err := t.allocator.Alloc(uint32(totalIDCount))
if err != nil {
return err
}
t.ids = lo.RangeFrom(start, totalIDCount)
return nil
}
func (t *SyncTask) nextID() int64 {
if len(t.ids) == 0 {
panic("pre-fetched ids exhausted")
}
r := t.ids[0]
t.ids = t.ids[1:]
return r
}
func (t *SyncTask) processInsertBlobs() {
for fieldID, blob := range t.binlogBlobs {
k := metautil.JoinIDPath(t.collectionID, t.partitionID, t.segmentID, fieldID, t.nextID())
key := path.Join(t.chunkManager.RootPath(), common.SegmentInsertLogPath, k)
t.segmentData[key] = blob.GetValue()
t.appendBinlog(fieldID, &datapb.Binlog{
EntriesNum: blob.RowNum,
TimestampFrom: t.tsFrom,
TimestampTo: t.tsTo,
LogPath: key,
LogSize: int64(len(blob.GetValue())),
MemorySize: t.binlogMemsize[fieldID],
})
}
}
func (t *SyncTask) processBM25StastBlob() {
for fieldID, blob := range t.bm25Blobs {
k := metautil.JoinIDPath(t.collectionID, t.partitionID, t.segmentID, fieldID, t.nextID())
key := path.Join(t.chunkManager.RootPath(), common.SegmentBm25LogPath, k)
t.segmentData[key] = blob.GetValue()
t.appendBM25Statslog(fieldID, &datapb.Binlog{
EntriesNum: blob.RowNum,
TimestampFrom: t.tsFrom,
TimestampTo: t.tsTo,
LogPath: key,
LogSize: int64(len(blob.GetValue())),
MemorySize: blob.MemorySize,
})
}
for fieldID, blob := range t.mergedBm25Blob {
k := metautil.JoinIDPath(t.collectionID, t.partitionID, t.segmentID, fieldID, int64(storage.CompoundStatsType))
key := path.Join(t.chunkManager.RootPath(), common.SegmentBm25LogPath, k)
t.segmentData[key] = blob.GetValue()
t.appendBM25Statslog(fieldID, &datapb.Binlog{
EntriesNum: blob.RowNum,
TimestampFrom: t.tsFrom,
TimestampTo: t.tsTo,
LogPath: key,
LogSize: int64(len(blob.GetValue())),
MemorySize: blob.MemorySize,
})
}
}
func (t *SyncTask) processStatsBlob() {
if t.batchStatsBlob != nil {
t.convertBlob2StatsBinlog(t.batchStatsBlob, t.pkField.GetFieldID(), t.nextID(), t.batchRows)
}
if t.mergedStatsBlob != nil {
totalRowNum := t.segment.NumOfRows()
t.convertBlob2StatsBinlog(t.mergedStatsBlob, t.pkField.GetFieldID(), int64(storage.CompoundStatsType), totalRowNum)
}
}
func (t *SyncTask) processDeltaBlob() {
if t.deltaBlob != nil {
value := t.deltaBlob.GetValue()
data := &datapb.Binlog{}
blobKey := metautil.JoinIDPath(t.collectionID, t.partitionID, t.segmentID, t.nextID())
blobPath := path.Join(t.chunkManager.RootPath(), common.SegmentDeltaLogPath, blobKey)
t.segmentData[blobPath] = value
data.LogSize = int64(len(t.deltaBlob.Value))
data.LogPath = blobPath
data.TimestampFrom = t.tsFrom
data.TimestampTo = t.tsTo
data.EntriesNum = t.deltaRowCount
data.MemorySize = t.deltaBlob.GetMemorySize()
t.appendDeltalog(data)
}
}
func (t *SyncTask) convertBlob2StatsBinlog(blob *storage.Blob, fieldID, logID int64, rowNum int64) {
key := metautil.JoinIDPath(t.collectionID, t.partitionID, t.segmentID, fieldID, logID)
key = path.Join(t.chunkManager.RootPath(), common.SegmentStatslogPath, key)
value := blob.GetValue()
t.segmentData[key] = value
t.appendStatslog(fieldID, &datapb.Binlog{
EntriesNum: rowNum,
TimestampFrom: t.tsFrom,
TimestampTo: t.tsTo,
LogPath: key,
LogSize: int64(len(value)),
MemorySize: int64(len(value)),
})
}
func (t *SyncTask) appendBinlog(fieldID int64, binlog *datapb.Binlog) {
fieldBinlog, ok := t.insertBinlogs[fieldID]
if !ok {
fieldBinlog = &datapb.FieldBinlog{
FieldID: fieldID,
}
t.insertBinlogs[fieldID] = fieldBinlog
}
fieldBinlog.Binlogs = append(fieldBinlog.Binlogs, binlog)
}
func (t *SyncTask) appendBM25Statslog(fieldID int64, log *datapb.Binlog) {
fieldBinlog, ok := t.bm25Binlogs[fieldID]
if !ok {
fieldBinlog = &datapb.FieldBinlog{
FieldID: fieldID,
}
t.bm25Binlogs[fieldID] = fieldBinlog
}
fieldBinlog.Binlogs = append(fieldBinlog.Binlogs, log)
}
func (t *SyncTask) appendStatslog(fieldID int64, statlog *datapb.Binlog) {
fieldBinlog, ok := t.statsBinlogs[fieldID]
if !ok {
fieldBinlog = &datapb.FieldBinlog{
FieldID: fieldID,
}
t.statsBinlogs[fieldID] = fieldBinlog
}
fieldBinlog.Binlogs = append(fieldBinlog.Binlogs, statlog)
}
func (t *SyncTask) appendDeltalog(deltalog *datapb.Binlog) {
t.deltaBinlog.Binlogs = append(t.deltaBinlog.Binlogs, deltalog)
}
// writeLogs writes log files (binlog/deltalog/statslog) into storage via chunkManger.
func (t *SyncTask) writeLogs(ctx context.Context) error {
return retry.Handle(ctx, func() (bool, error) {
err := t.chunkManager.MultiWrite(ctx, t.segmentData)
if err != nil {
return !merr.IsCanceledOrTimeout(err), err
}
return false, nil
}, t.writeRetryOpts...)
}
// writeMeta updates segments via meta writer in option.
func (t *SyncTask) writeMeta(ctx context.Context) error {
return t.metaWriter.UpdateSync(ctx, t)
@ -411,7 +186,7 @@ func (t *SyncTask) ChannelName() string {
}
func (t *SyncTask) IsFlush() bool {
return t.isFlush
return t.pack.isFlush
}
func (t *SyncTask) Binlogs() (map[int64]*datapb.FieldBinlog, map[int64]*datapb.FieldBinlog, *datapb.FieldBinlog, map[int64]*datapb.FieldBinlog) {
@ -419,13 +194,17 @@ func (t *SyncTask) Binlogs() (map[int64]*datapb.FieldBinlog, map[int64]*datapb.F
}
func (t *SyncTask) MarshalJSON() ([]byte, error) {
deltaRowCount := int64(0)
if t.pack != nil && t.pack.deltaData != nil {
deltaRowCount = t.pack.deltaData.RowCount
}
return json.Marshal(&metricsinfo.SyncTask{
SegmentID: t.segmentID,
BatchRows: t.batchRows,
SegmentLevel: t.level.String(),
TSFrom: tsoutil.PhysicalTimeFormat(t.tsFrom),
TSTo: tsoutil.PhysicalTimeFormat(t.tsTo),
DeltaRowCount: t.deltaRowCount,
DeltaRowCount: deltaRowCount,
FlushSize: t.flushedSize,
RunningTime: t.execTime.String(),
NodeID: paramtable.GetNodeID(),

View File

@ -102,10 +102,12 @@ func (s *SyncTaskSuite) SetupTest() {
s.chunkManager = mocks.NewChunkManager(s.T())
s.chunkManager.EXPECT().RootPath().Return("files").Maybe()
s.chunkManager.EXPECT().MultiWrite(mock.Anything, mock.Anything).Return(nil).Maybe()
s.chunkManager.EXPECT().Write(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
s.broker = broker.NewMockBroker(s.T())
s.metacache = metacache.NewMockMetaCache(s.T())
s.metacache.EXPECT().Collection().Return(s.collectionID).Maybe()
s.metacache.EXPECT().Schema().Return(s.schema).Maybe()
}
func (s *SyncTaskSuite) getEmptyInsertBuffer() *storage.InsertData {
@ -144,26 +146,16 @@ func (s *SyncTaskSuite) getDeleteBuffer() *storage.DeleteData {
return buf
}
func (s *SyncTaskSuite) getDeleteBufferZeroTs() *storage.DeleteData {
buf := &storage.DeleteData{}
for i := 0; i < 10; i++ {
pk := storage.NewInt64PrimaryKey(int64(i + 1))
buf.Append(pk, 0)
}
return buf
}
func (s *SyncTaskSuite) getSuiteSyncTask() *SyncTask {
task := NewSyncTask().WithCollectionID(s.collectionID).
WithPartitionID(s.partitionID).
WithSegmentID(s.segmentID).
WithChannelName(s.channelName).
WithSchema(s.schema).
WithChunkManager(s.chunkManager).
func (s *SyncTaskSuite) getSuiteSyncTask(pack *SyncPack) *SyncTask {
task := NewSyncTask().
WithSyncPack(pack.
WithCollectionID(s.collectionID).
WithPartitionID(s.partitionID).
WithSegmentID(s.segmentID).
WithChannelName(s.channelName)).
WithAllocator(s.allocator).
WithChunkManager(s.chunkManager).
WithMetaCache(s.metacache)
task.binlogMemsize = map[int64]int64{0: 1, 1: 1, 100: 100}
return task
}
@ -192,79 +184,64 @@ func (s *SyncTaskSuite) TestRunNormal() {
seg.GetBloomFilterSet().Roll()
s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true)
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Run(func(action metacache.SegmentAction, filters ...metacache.SegmentFilter) {
action(seg)
}).Return()
s.Run("without_data", func() {
task := s.getSuiteSyncTask()
task := s.getSuiteSyncTask(new(SyncPack).WithCheckpoint(
&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
}))
task.WithMetaWriter(BrokerMetaWriter(s.broker, 1))
task.WithTimeRange(50, 100)
task.WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
})
err := task.Run(ctx)
s.NoError(err)
})
s.Run("with_insert_delete_cp", func() {
task := s.getSuiteSyncTask()
task.WithTimeRange(50, 100)
task := s.getSuiteSyncTask(
new(SyncPack).
WithInsertData([]*storage.InsertData{s.getInsertBuffer()}).
WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
}))
task.WithMetaWriter(BrokerMetaWriter(s.broker, 1))
task.WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
})
task.binlogBlobs[100] = &storage.Blob{
Key: "100",
Value: []byte("test_data"),
}
err := task.Run(ctx)
s.NoError(err)
})
s.Run("with_statslog", func() {
task := s.getSuiteSyncTask()
task.WithTimeRange(50, 100)
s.Run("with_flush", func() {
task := s.getSuiteSyncTask(
new(SyncPack).
WithInsertData([]*storage.InsertData{s.getInsertBuffer()}).
WithFlush().
WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
}))
task.WithMetaWriter(BrokerMetaWriter(s.broker, 1))
task.WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
})
task.WithFlush()
task.batchStatsBlob = &storage.Blob{
Key: "100",
Value: []byte("test_data"),
}
task.mergedStatsBlob = &storage.Blob{
Key: "1",
Value: []byte("test_data"),
}
err := task.Run(ctx)
s.NoError(err)
})
s.Run("with_delta_data", func() {
s.Run("with_drop", func() {
s.metacache.EXPECT().RemoveSegments(mock.Anything, mock.Anything).Return(nil).Once()
task := s.getSuiteSyncTask()
task.WithTimeRange(50, 100)
task := s.getSuiteSyncTask(new(SyncPack).
WithDeleteData(s.getDeleteBuffer()).
WithDrop().
WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
}))
task.WithMetaWriter(BrokerMetaWriter(s.broker, 1))
task.WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
})
task.WithDrop()
task.deltaBlob = &storage.Blob{
Key: "100",
Value: []byte("test_data"),
}
err := task.Run(ctx)
s.NoError(err)
})
@ -281,19 +258,15 @@ func (s *SyncTaskSuite) TestRunL0Segment() {
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
s.Run("pure_delete_l0_flush", func() {
task := s.getSuiteSyncTask()
task.deltaBlob = &storage.Blob{
Key: "100",
Value: []byte("test_data"),
}
task.WithTimeRange(50, 100)
task := s.getSuiteSyncTask(new(SyncPack).
WithDeleteData(s.getDeleteBuffer()).
WithFlush().
WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
}))
task.WithMetaWriter(BrokerMetaWriter(s.broker, 1))
task.WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
})
task.WithFlush()
err := task.Run(ctx)
s.NoError(err)
@ -307,7 +280,7 @@ func (s *SyncTaskSuite) TestRunError() {
s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(nil, false)
flag := false
handler := func(_ error) { flag = true }
task := s.getSuiteSyncTask().WithFailureCallback(handler)
task := s.getSuiteSyncTask(new(SyncPack)).WithFailureCallback(handler)
err := task.Run(ctx)
@ -320,13 +293,14 @@ func (s *SyncTaskSuite) TestRunError() {
metacache.UpdateNumOfRows(1000)(seg)
s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true)
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().Collection().Return(s.collectionID).Maybe()
s.metacache.EXPECT().Schema().Return(s.schema).Maybe()
s.Run("allocate_id_fail", func() {
mockAllocator := allocator.NewMockAllocator(s.T())
mockAllocator.EXPECT().Alloc(mock.Anything).Return(0, 0, errors.New("mocked"))
task := s.getSuiteSyncTask()
task.allocator = mockAllocator
task := s.getSuiteSyncTask(new(SyncPack).WithFlush()).WithAllocator(mockAllocator)
err := task.Run(ctx)
s.Error(err)
@ -335,14 +309,13 @@ func (s *SyncTaskSuite) TestRunError() {
s.Run("metawrite_fail", func() {
s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(errors.New("mocked"))
task := s.getSuiteSyncTask()
task := s.getSuiteSyncTask(new(SyncPack).
WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
}))
task.WithMetaWriter(BrokerMetaWriter(s.broker, 1, retry.Attempts(1)))
task.WithTimeRange(50, 100)
task.WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
})
err := task.Run(ctx)
s.Error(err)
@ -353,14 +326,10 @@ func (s *SyncTaskSuite) TestRunError() {
handler := func(_ error) { flag = true }
s.chunkManager.ExpectedCalls = nil
s.chunkManager.EXPECT().RootPath().Return("files")
s.chunkManager.EXPECT().MultiWrite(mock.Anything, mock.Anything).Return(retry.Unrecoverable(errors.New("mocked")))
task := s.getSuiteSyncTask().WithFailureCallback(handler)
task.binlogBlobs[100] = &storage.Blob{
Key: "100",
Value: []byte("test_data"),
}
task.WithWriteRetryOptions(retry.Attempts(1))
s.chunkManager.EXPECT().Write(mock.Anything, mock.Anything, mock.Anything).Return(retry.Unrecoverable(errors.New("mocked")))
task := s.getSuiteSyncTask(new(SyncPack).WithInsertData([]*storage.InsertData{s.getInsertBuffer()})).
WithFailureCallback(handler).
WithWriteRetryOptions(retry.Attempts(1))
err := task.Run(ctx)
@ -369,43 +338,26 @@ func (s *SyncTaskSuite) TestRunError() {
})
}
func (s *SyncTaskSuite) TestNextID() {
task := s.getSuiteSyncTask()
task.ids = []int64{0}
s.Run("normal_next", func() {
id := task.nextID()
s.EqualValues(0, id)
})
s.Run("id_exhausted", func() {
s.Panics(func() {
task.nextID()
})
})
}
func (s *SyncTaskSuite) TestSyncTask_MarshalJSON() {
t := &SyncTask{
segmentID: 12345,
batchRows: 100,
level: datapb.SegmentLevel_L0,
tsFrom: 1000,
tsTo: 2000,
deltaRowCount: 10,
flushedSize: 1024,
execTime: 2 * time.Second,
segmentID: 12345,
batchRows: 100,
level: datapb.SegmentLevel_L0,
tsFrom: 1000,
tsTo: 2000,
flushedSize: 1024,
execTime: 2 * time.Second,
}
tm := &metricsinfo.SyncTask{
SegmentID: t.segmentID,
BatchRows: t.batchRows,
SegmentLevel: t.level.String(),
TSFrom: tsoutil.PhysicalTimeFormat(t.tsFrom),
TSTo: tsoutil.PhysicalTimeFormat(t.tsTo),
DeltaRowCount: t.deltaRowCount,
FlushSize: t.flushedSize,
RunningTime: t.execTime.String(),
NodeID: paramtable.GetNodeID(),
SegmentID: t.segmentID,
BatchRows: t.batchRows,
SegmentLevel: t.level.String(),
TSFrom: tsoutil.PhysicalTimeFormat(t.tsFrom),
TSTo: tsoutil.PhysicalTimeFormat(t.tsTo),
FlushSize: t.flushedSize,
RunningTime: t.execTime.String(),
NodeID: paramtable.GetNodeID(),
}
expectedBytes, err := json.Marshal(tm)
s.NoError(err)

View File

@ -204,16 +204,6 @@ func (s *L0WriteBufferSuite) TestBufferData() {
})
}
func (s *L0WriteBufferSuite) TestCreateFailure() {
metacache := metacache.NewMockMetaCache(s.T())
metacache.EXPECT().Collection().Return(s.collID)
metacache.EXPECT().Schema().Return(&schemapb.CollectionSchema{})
_, err := NewL0WriteBuffer(s.channelName, metacache, s.syncMgr, &writeBufferOption{
idAllocator: s.allocator,
})
s.Error(err)
}
func TestL0WriteBuffer(t *testing.T) {
suite.Run(t, new(L0WriteBufferSuite))
}

View File

@ -13,6 +13,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
"github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle"
"github.com/milvus-io/milvus/internal/flushcommon/syncmgr"
@ -115,6 +116,7 @@ type writeBufferBase struct {
channelName string
metaWriter syncmgr.MetaWriter
allocator allocator.Interface
collSchema *schemapb.CollectionSchema
estSizePerRecord int
metaCache metacache.MetaCache
@ -125,7 +127,6 @@ type writeBufferBase struct {
syncPolicies []SyncPolicy
syncCheckpoint *checkpointCandidates
syncMgr syncmgr.SyncManager
serializer syncmgr.Serializer
checkpoint *msgpb.MsgPosition
flushTimestamp *atomic.Uint64
@ -143,17 +144,6 @@ func newWriteBufferBase(channel string, metacache metacache.MetaCache, syncMgr s
flushTsPolicy := GetFlushTsPolicy(flushTs, metacache)
option.syncPolicies = append(option.syncPolicies, flushTsPolicy)
var serializer syncmgr.Serializer
var err error
serializer, err = syncmgr.NewStorageSerializer(
option.idAllocator,
metacache,
option.metaWriter,
)
if err != nil {
return nil, err
}
schema := metacache.Schema()
estSize, err := typeutil.EstimateSizePerRecord(schema)
if err != nil {
@ -167,9 +157,9 @@ func newWriteBufferBase(channel string, metacache metacache.MetaCache, syncMgr s
estSizePerRecord: estSize,
syncMgr: syncMgr,
metaWriter: option.metaWriter,
allocator: option.idAllocator,
buffers: make(map[int64]*segmentBuffer),
metaCache: metacache,
serializer: serializer,
syncCheckpoint: newCheckpointCandiates(),
syncPolicies: option.syncPolicies,
flushTimestamp: flushTs,
@ -617,7 +607,12 @@ func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) (sy
metrics.DataNodeFlowGraphBufferDataSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(wb.collectionID)).Sub(totalMemSize)
return wb.serializer.EncodeBuffer(ctx, pack)
task := syncmgr.NewSyncTask().
WithAllocator(wb.allocator).
WithMetaWriter(wb.metaWriter).
WithMetaCache(wb.metaCache).
WithSyncPack(pack)
return task, nil
}
// getEstBatchSize returns the batch size based on estimated size per record and FlushBufferSize configuration value.

View File

@ -17,7 +17,6 @@ import (
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
@ -225,39 +224,6 @@ func (s *WriteBufferSuite) TestGetCheckpoint() {
})
}
func (s *WriteBufferSuite) TestSyncSegmentsError() {
wb, err := newWriteBufferBase(s.channelName, s.metacache, s.syncMgr, &writeBufferOption{
pkStatsFactory: func(vchannel *datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
},
})
s.Require().NoError(err)
serializer := syncmgr.NewMockSerializer(s.T())
wb.serializer = serializer
segment := metacache.NewSegmentInfo(&datapb.SegmentInfo{
ID: 1,
}, nil, nil)
s.metacache.EXPECT().GetSegmentByID(int64(1)).Return(segment, true)
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
s.Run("segment_not_found", func() {
serializer.EXPECT().EncodeBuffer(mock.Anything, mock.Anything).Return(nil, merr.WrapErrSegmentNotFound(1)).Once()
s.NotPanics(func() {
wb.syncSegments(context.Background(), []int64{1})
})
})
s.Run("other_err", func() {
serializer.EXPECT().EncodeBuffer(mock.Anything, mock.Anything).Return(nil, merr.WrapErrServiceInternal("mocked")).Once()
s.Panics(func() {
wb.syncSegments(context.Background(), []int64{1})
})
})
}
func (s *WriteBufferSuite) TestEvictBuffer() {
wb, err := newWriteBufferBase(s.channelName, s.metacache, s.syncMgr, &writeBufferOption{
pkStatsFactory: func(vchannel *datapb.SegmentInfo) pkoracle.PkStat {
@ -266,10 +232,6 @@ func (s *WriteBufferSuite) TestEvictBuffer() {
})
s.Require().NoError(err)
serializer := syncmgr.NewMockSerializer(s.T())
wb.serializer = serializer
s.Run("no_checkpoint", func() {
wb.mut.Lock()
wb.buffers[100] = &segmentBuffer{}
@ -281,8 +243,6 @@ func (s *WriteBufferSuite) TestEvictBuffer() {
}()
wb.EvictBuffer(GetOldestBufferPolicy(1))
serializer.AssertNotCalled(s.T(), "EncodeBuffer")
})
s.Run("trigger_sync", func() {
@ -314,7 +274,6 @@ func (s *WriteBufferSuite) TestEvictBuffer() {
}, nil, nil)
s.metacache.EXPECT().GetSegmentByID(int64(2)).Return(segment, true)
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
serializer.EXPECT().EncodeBuffer(mock.Anything, mock.Anything).Return(syncmgr.NewSyncTask(), nil)
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything, mock.Anything).Return(conc.Go[struct{}](func() (struct{}, error) {
return struct{}{}, nil
}), nil)

View File

@ -146,9 +146,10 @@ func (insertCodec *InsertCodec) SerializePkStats(stats *PrimaryKeyStats, rowNum
buffer := statsWriter.GetBuffer()
return &Blob{
Key: blobKey,
Value: buffer,
RowNum: rowNum,
Key: blobKey,
Value: buffer,
RowNum: rowNum,
MemorySize: int64(len(buffer)),
}, nil
}

View File

@ -115,18 +115,6 @@ var (
collectionIDLabelName,
})
DataNodeEncodeBufferLatency = prometheus.NewHistogramVec( // TODO: arguably
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "encode_buffer_latency",
Help: "latency of encode buffer data",
Buckets: buckets,
}, []string{
nodeIDLabelName,
segmentLevelLabelName,
})
DataNodeSave2StorageLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
@ -268,7 +256,6 @@ func RegisterDataNode(registry *prometheus.Registry) {
registry.MustRegister(DataNodeFlowGraphBufferDataSize)
// output related
registry.MustRegister(DataNodeAutoFlushBufferCount)
registry.MustRegister(DataNodeEncodeBufferLatency)
registry.MustRegister(DataNodeSave2StorageLatency)
registry.MustRegister(DataNodeFlushBufferCount)
registry.MustRegister(DataNodeFlushReqCounter)