Add SyncManager to replace flush manager (#27873)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/27590/head
congqixia 2023-10-31 02:30:16 +08:00 committed by GitHub
parent 7c9d24cbb7
commit 233bf90c55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1522 additions and 10 deletions

View File

@ -429,6 +429,7 @@ generate-mockery-datacoord: getdeps
generate-mockery-datanode: getdeps
$(INSTALL_PATH)/mockery --name=Allocator --dir=$(PWD)/internal/datanode/allocator --output=$(PWD)/internal/datanode/allocator --filename=mock_allocator.go --with-expecter --structname=MockAllocator --outpkg=allocator --inpackage
$(INSTALL_PATH)/mockery --name=Broker --dir=$(PWD)/internal/datanode/broker --output=$(PWD)/internal/datanode/broker/ --filename=mock_broker.go --with-expecter --structname=MockBroker --outpkg=broker --inpackage
$(INSTALL_PATH)/mockery --name=MetaCache --dir=$(PWD)/internal/datanode/metacache --output=$(PWD)/internal/datanode/metacache --filename=mock_meta_cache.go --with-expecter --structname=MockMetaCache --outpkg=metacache --inpackage
generate-mockery-metastore: getdeps
$(INSTALL_PATH)/mockery --name=RootCoordCatalog --dir=$(PWD)/internal/metastore --output=$(PWD)/internal/metastore/mocks --filename=mock_rootcoord_catalog.go --with-expecter --structname=RootCoordCatalog --outpkg=mocks

View File

@ -30,7 +30,7 @@ type BloomFilterSet struct {
history []*storage.PkStatistics
}
func newBloomFilterSet() *BloomFilterSet {
func NewBloomFilterSet() *BloomFilterSet {
return &BloomFilterSet{}
}

View File

@ -31,7 +31,7 @@ type BloomFilterSetSuite struct {
}
func (s *BloomFilterSetSuite) SetupTest() {
s.bfs = newBloomFilterSet()
s.bfs = NewBloomFilterSet()
}
func (s *BloomFilterSetSuite) TearDownSuite() {

View File

@ -59,11 +59,11 @@ func NewMetaCache(vchannel *datapb.VchannelInfo, factory PkStatsFactory) MetaCac
func (c *metaCacheImpl) init(vchannel *datapb.VchannelInfo, factory PkStatsFactory) {
for _, seg := range vchannel.FlushedSegments {
c.segmentInfos[seg.GetID()] = newSegmentInfo(seg, factory(seg))
c.segmentInfos[seg.GetID()] = NewSegmentInfo(seg, factory(seg))
}
for _, seg := range vchannel.UnflushedSegments {
c.segmentInfos[seg.GetID()] = newSegmentInfo(seg, factory(seg))
c.segmentInfos[seg.GetID()] = NewSegmentInfo(seg, factory(seg))
}
}

View File

@ -50,7 +50,7 @@ func (s *MetaCacheSuite) SetupSuite() {
s.newSegments = []int64{9, 10, 11, 12}
s.invaliedSeg = 111
s.bfsFactory = func(*datapb.SegmentInfo) *BloomFilterSet {
return newBloomFilterSet()
return NewBloomFilterSet()
}
}

View File

@ -0,0 +1,277 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
package metacache
import mock "github.com/stretchr/testify/mock"
// MockMetaCache is an autogenerated mock type for the MetaCache type
type MockMetaCache struct {
mock.Mock
}
type MockMetaCache_Expecter struct {
mock *mock.Mock
}
func (_m *MockMetaCache) EXPECT() *MockMetaCache_Expecter {
return &MockMetaCache_Expecter{mock: &_m.Mock}
}
// CompactSegments provides a mock function with given fields: newSegmentID, partitionID, oldSegmentIDs
func (_m *MockMetaCache) CompactSegments(newSegmentID int64, partitionID int64, oldSegmentIDs ...int64) {
_va := make([]interface{}, len(oldSegmentIDs))
for _i := range oldSegmentIDs {
_va[_i] = oldSegmentIDs[_i]
}
var _ca []interface{}
_ca = append(_ca, newSegmentID, partitionID)
_ca = append(_ca, _va...)
_m.Called(_ca...)
}
// MockMetaCache_CompactSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CompactSegments'
type MockMetaCache_CompactSegments_Call struct {
*mock.Call
}
// CompactSegments is a helper method to define mock.On call
// - newSegmentID int64
// - partitionID int64
// - oldSegmentIDs ...int64
func (_e *MockMetaCache_Expecter) CompactSegments(newSegmentID interface{}, partitionID interface{}, oldSegmentIDs ...interface{}) *MockMetaCache_CompactSegments_Call {
return &MockMetaCache_CompactSegments_Call{Call: _e.mock.On("CompactSegments",
append([]interface{}{newSegmentID, partitionID}, oldSegmentIDs...)...)}
}
func (_c *MockMetaCache_CompactSegments_Call) Run(run func(newSegmentID int64, partitionID int64, oldSegmentIDs ...int64)) *MockMetaCache_CompactSegments_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]int64, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(int64)
}
}
run(args[0].(int64), args[1].(int64), variadicArgs...)
})
return _c
}
func (_c *MockMetaCache_CompactSegments_Call) Return() *MockMetaCache_CompactSegments_Call {
_c.Call.Return()
return _c
}
func (_c *MockMetaCache_CompactSegments_Call) RunAndReturn(run func(int64, int64, ...int64)) *MockMetaCache_CompactSegments_Call {
_c.Call.Return(run)
return _c
}
// GetSegmentIDsBy provides a mock function with given fields: filters
func (_m *MockMetaCache) GetSegmentIDsBy(filters ...SegmentFilter) []int64 {
_va := make([]interface{}, len(filters))
for _i := range filters {
_va[_i] = filters[_i]
}
var _ca []interface{}
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 []int64
if rf, ok := ret.Get(0).(func(...SegmentFilter) []int64); ok {
r0 = rf(filters...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]int64)
}
}
return r0
}
// MockMetaCache_GetSegmentIDsBy_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSegmentIDsBy'
type MockMetaCache_GetSegmentIDsBy_Call struct {
*mock.Call
}
// GetSegmentIDsBy is a helper method to define mock.On call
// - filters ...SegmentFilter
func (_e *MockMetaCache_Expecter) GetSegmentIDsBy(filters ...interface{}) *MockMetaCache_GetSegmentIDsBy_Call {
return &MockMetaCache_GetSegmentIDsBy_Call{Call: _e.mock.On("GetSegmentIDsBy",
append([]interface{}{}, filters...)...)}
}
func (_c *MockMetaCache_GetSegmentIDsBy_Call) Run(run func(filters ...SegmentFilter)) *MockMetaCache_GetSegmentIDsBy_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]SegmentFilter, len(args)-0)
for i, a := range args[0:] {
if a != nil {
variadicArgs[i] = a.(SegmentFilter)
}
}
run(variadicArgs...)
})
return _c
}
func (_c *MockMetaCache_GetSegmentIDsBy_Call) Return(_a0 []int64) *MockMetaCache_GetSegmentIDsBy_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockMetaCache_GetSegmentIDsBy_Call) RunAndReturn(run func(...SegmentFilter) []int64) *MockMetaCache_GetSegmentIDsBy_Call {
_c.Call.Return(run)
return _c
}
// GetSegmentsBy provides a mock function with given fields: filters
func (_m *MockMetaCache) GetSegmentsBy(filters ...SegmentFilter) []*SegmentInfo {
_va := make([]interface{}, len(filters))
for _i := range filters {
_va[_i] = filters[_i]
}
var _ca []interface{}
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 []*SegmentInfo
if rf, ok := ret.Get(0).(func(...SegmentFilter) []*SegmentInfo); ok {
r0 = rf(filters...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*SegmentInfo)
}
}
return r0
}
// MockMetaCache_GetSegmentsBy_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSegmentsBy'
type MockMetaCache_GetSegmentsBy_Call struct {
*mock.Call
}
// GetSegmentsBy is a helper method to define mock.On call
// - filters ...SegmentFilter
func (_e *MockMetaCache_Expecter) GetSegmentsBy(filters ...interface{}) *MockMetaCache_GetSegmentsBy_Call {
return &MockMetaCache_GetSegmentsBy_Call{Call: _e.mock.On("GetSegmentsBy",
append([]interface{}{}, filters...)...)}
}
func (_c *MockMetaCache_GetSegmentsBy_Call) Run(run func(filters ...SegmentFilter)) *MockMetaCache_GetSegmentsBy_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]SegmentFilter, len(args)-0)
for i, a := range args[0:] {
if a != nil {
variadicArgs[i] = a.(SegmentFilter)
}
}
run(variadicArgs...)
})
return _c
}
func (_c *MockMetaCache_GetSegmentsBy_Call) Return(_a0 []*SegmentInfo) *MockMetaCache_GetSegmentsBy_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockMetaCache_GetSegmentsBy_Call) RunAndReturn(run func(...SegmentFilter) []*SegmentInfo) *MockMetaCache_GetSegmentsBy_Call {
_c.Call.Return(run)
return _c
}
// NewSegment provides a mock function with given fields: segmentID, partitionID
func (_m *MockMetaCache) NewSegment(segmentID int64, partitionID int64) {
_m.Called(segmentID, partitionID)
}
// MockMetaCache_NewSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'NewSegment'
type MockMetaCache_NewSegment_Call struct {
*mock.Call
}
// NewSegment is a helper method to define mock.On call
// - segmentID int64
// - partitionID int64
func (_e *MockMetaCache_Expecter) NewSegment(segmentID interface{}, partitionID interface{}) *MockMetaCache_NewSegment_Call {
return &MockMetaCache_NewSegment_Call{Call: _e.mock.On("NewSegment", segmentID, partitionID)}
}
func (_c *MockMetaCache_NewSegment_Call) Run(run func(segmentID int64, partitionID int64)) *MockMetaCache_NewSegment_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(int64))
})
return _c
}
func (_c *MockMetaCache_NewSegment_Call) Return() *MockMetaCache_NewSegment_Call {
_c.Call.Return()
return _c
}
func (_c *MockMetaCache_NewSegment_Call) RunAndReturn(run func(int64, int64)) *MockMetaCache_NewSegment_Call {
_c.Call.Return(run)
return _c
}
// UpdateSegments provides a mock function with given fields: action, filters
func (_m *MockMetaCache) UpdateSegments(action SegmentAction, filters ...SegmentFilter) {
_va := make([]interface{}, len(filters))
for _i := range filters {
_va[_i] = filters[_i]
}
var _ca []interface{}
_ca = append(_ca, action)
_ca = append(_ca, _va...)
_m.Called(_ca...)
}
// MockMetaCache_UpdateSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateSegments'
type MockMetaCache_UpdateSegments_Call struct {
*mock.Call
}
// UpdateSegments is a helper method to define mock.On call
// - action SegmentAction
// - filters ...SegmentFilter
func (_e *MockMetaCache_Expecter) UpdateSegments(action interface{}, filters ...interface{}) *MockMetaCache_UpdateSegments_Call {
return &MockMetaCache_UpdateSegments_Call{Call: _e.mock.On("UpdateSegments",
append([]interface{}{action}, filters...)...)}
}
func (_c *MockMetaCache_UpdateSegments_Call) Run(run func(action SegmentAction, filters ...SegmentFilter)) *MockMetaCache_UpdateSegments_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]SegmentFilter, len(args)-1)
for i, a := range args[1:] {
if a != nil {
variadicArgs[i] = a.(SegmentFilter)
}
}
run(args[0].(SegmentAction), variadicArgs...)
})
return _c
}
func (_c *MockMetaCache_UpdateSegments_Call) Return() *MockMetaCache_UpdateSegments_Call {
_c.Call.Return()
return _c
}
func (_c *MockMetaCache_UpdateSegments_Call) RunAndReturn(run func(SegmentAction, ...SegmentFilter)) *MockMetaCache_UpdateSegments_Call {
_c.Call.Return(run)
return _c
}
// NewMockMetaCache creates a new instance of MockMetaCache. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockMetaCache(t interface {
mock.TestingT
Cleanup(func())
}) *MockMetaCache {
mock := &MockMetaCache{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -75,7 +75,7 @@ func (s *SegmentInfo) Clone() *SegmentInfo {
}
}
func newSegmentInfo(info *datapb.SegmentInfo, bfs *BloomFilterSet) *SegmentInfo {
func NewSegmentInfo(info *datapb.SegmentInfo, bfs *BloomFilterSet) *SegmentInfo {
return &SegmentInfo{
segmentID: info.GetID(),
partitionID: info.GetPartitionID(),

View File

@ -31,8 +31,8 @@ type SegmentSuite struct {
}
func (s *SegmentSuite) TestBasic() {
bfs := newBloomFilterSet()
segment := newSegmentInfo(s.info, bfs)
bfs := NewBloomFilterSet()
segment := NewSegmentInfo(s.info, bfs)
s.Equal(s.info.GetID(), segment.SegmentID())
s.Equal(s.info.GetPartitionID(), segment.PartitionID())
s.Equal(s.info.GetNumOfRows(), segment.NumOfRows())
@ -43,8 +43,8 @@ func (s *SegmentSuite) TestBasic() {
}
func (s *SegmentSuite) TestClone() {
bfs := newBloomFilterSet()
segment := newSegmentInfo(s.info, bfs)
bfs := NewBloomFilterSet()
segment := NewSegmentInfo(s.info, bfs)
cloned := segment.Clone()
s.Equal(segment.SegmentID(), cloned.SegmentID())
s.Equal(segment.PartitionID(), cloned.PartitionID())

View File

@ -0,0 +1,32 @@
package syncmgr
import (
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/lock"
)
type Task interface {
Run() error
}
type keyLockDispatcher[K comparable] struct {
keyLock *lock.KeyLock[K]
workerPool *conc.Pool[struct{}]
}
func newKeyLockDispatcher[K comparable](maxParallel int) *keyLockDispatcher[K] {
return &keyLockDispatcher[K]{
workerPool: conc.NewPool[struct{}](maxParallel, conc.WithPreAlloc(true)),
keyLock: lock.NewKeyLock[K](),
}
}
func (d *keyLockDispatcher[K]) Submit(key K, t Task) *conc.Future[struct{}] {
d.keyLock.Lock(key)
defer d.keyLock.Unlock(key)
return d.workerPool.Submit(func() (struct{}, error) {
err := t.Run()
return struct{}{}, err
})
}

View File

@ -0,0 +1,83 @@
package syncmgr
import (
"testing"
"time"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"
)
type mockTask struct {
ch chan struct{}
err error
}
func (t *mockTask) done() {
close(t.ch)
}
func (t *mockTask) Run() error {
<-t.ch
return t.err
}
func newMockTask(err error) *mockTask {
return &mockTask{
err: err,
ch: make(chan struct{}),
}
}
type KeyLockDispatcherSuite struct {
suite.Suite
}
func (s *KeyLockDispatcherSuite) TestKeyLock() {
d := newKeyLockDispatcher[int64](2)
t1 := newMockTask(nil)
t2 := newMockTask(nil)
sig := atomic.NewBool(false)
d.Submit(1, t1)
go func() {
d.Submit(1, t2)
sig.Store(true)
}()
s.False(sig.Load(), "task 2 will never be submit before task 1 done")
t1.done()
s.Eventually(sig.Load, time.Second, time.Millisecond*100)
}
func (s *KeyLockDispatcherSuite) TestCap() {
d := newKeyLockDispatcher[int64](1)
t1 := newMockTask(nil)
t2 := newMockTask(nil)
sig := atomic.NewBool(false)
d.Submit(1, t1)
go func() {
d.Submit(2, t2)
sig.Store(true)
}()
s.False(sig.Load(), "task 2 will never be submit before task 1 done")
t1.done()
s.Eventually(sig.Load, time.Second, time.Millisecond*100)
}
func TestKeyLockDispatcher(t *testing.T) {
suite.Run(t, new(KeyLockDispatcherSuite))
}

View File

@ -0,0 +1,132 @@
package syncmgr
import (
"context"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
)
// MetaWriter is the interface for SyncManager to write segment sync meta.
type MetaWriter interface {
UpdateSync(*SyncTask) error
}
type brokerMetaWriter struct {
broker broker.Broker
opts []retry.Option
}
func BrokerMetaWriter(broker broker.Broker, opts ...retry.Option) MetaWriter {
return &brokerMetaWriter{
broker: broker,
opts: opts,
}
}
func (b *brokerMetaWriter) UpdateSync(pack *SyncTask) error {
var (
fieldInsert = []*datapb.FieldBinlog{}
fieldStats = []*datapb.FieldBinlog{}
deltaInfos = make([]*datapb.FieldBinlog, 1)
checkPoints = []*datapb.CheckPoint{}
)
for k, v := range pack.insertBinlogs {
fieldInsert = append(fieldInsert, &datapb.FieldBinlog{FieldID: k, Binlogs: []*datapb.Binlog{v}})
}
for k, v := range pack.statsBinlogs {
fieldStats = append(fieldStats, &datapb.FieldBinlog{FieldID: k, Binlogs: []*datapb.Binlog{v}})
}
deltaInfos[0] = &datapb.FieldBinlog{Binlogs: []*datapb.Binlog{pack.deltaBinlog}}
// only current segment checkpoint info,
segments := pack.metacache.GetSegmentsBy(metacache.WithSegmentID(pack.segmentID))
if len(segments) == 0 {
return merr.WrapErrSegmentNotFound(pack.segmentID)
}
segment := segments[0]
checkPoints = append(checkPoints, &datapb.CheckPoint{
SegmentID: pack.segmentID,
NumOfRows: segment.NumOfRows(), //+ pack.option.Row,
Position: pack.checkpoint,
})
startPos := lo.Map(pack.metacache.GetSegmentsBy(metacache.WithStartPosNotRecorded()), func(info *metacache.SegmentInfo, _ int) *datapb.SegmentStartPosition {
return &datapb.SegmentStartPosition{
SegmentID: info.SegmentID(),
StartPosition: info.StartPosition(),
}
})
log.Info("SaveBinlogPath",
zap.Int64("SegmentID", pack.segmentID),
zap.Int64("CollectionID", pack.collectionID),
zap.Any("startPos", startPos),
zap.Any("checkPoints", checkPoints),
zap.Int("Length of Field2BinlogPaths", len(fieldInsert)),
zap.Int("Length of Field2Stats", len(fieldStats)),
zap.Int("Length of Field2Deltalogs", len(deltaInfos[0].GetBinlogs())),
zap.String("vChannelName", pack.channelName),
)
req := &datapb.SaveBinlogPathsRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(0),
commonpbutil.WithMsgID(0),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
SegmentID: pack.segmentID,
CollectionID: pack.collectionID,
Field2BinlogPaths: fieldInsert,
Field2StatslogPaths: fieldStats,
Deltalogs: deltaInfos,
CheckPoints: checkPoints,
StartPositions: startPos,
Flushed: pack.isFlush,
// Dropped: pack.option.isDrop,
Channel: pack.channelName,
}
err := retry.Do(context.Background(), func() error {
err := b.broker.SaveBinlogPaths(context.Background(), req)
// Segment not found during stale segment flush. Segment might get compacted already.
// Stop retry and still proceed to the end, ignoring this error.
if !pack.isFlush && errors.Is(err, merr.ErrSegmentNotFound) {
log.Warn("stale segment not found, could be compacted",
zap.Int64("segmentID", pack.segmentID))
log.Warn("failed to SaveBinlogPaths",
zap.Int64("segmentID", pack.segmentID),
zap.Error(err))
return nil
}
// meta error, datanode handles a virtual channel does not belong here
if errors.IsAny(err, merr.ErrSegmentNotFound, merr.ErrChannelNotFound) {
log.Warn("meta error found, skip sync and start to drop virtual channel", zap.String("channel", pack.channelName))
return nil
}
if err != nil {
return err
}
return nil
}, b.opts...)
if err != nil {
log.Warn("failed to SaveBinlogPaths",
zap.Int64("segmentID", pack.segmentID),
zap.Error(err))
}
return err
}

View File

@ -0,0 +1,64 @@
package syncmgr
import (
"testing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
)
type MetaWriterSuite struct {
suite.Suite
broker *broker.MockBroker
metacache *metacache.MockMetaCache
writer MetaWriter
}
func (s *MetaWriterSuite) SetupSuite() {
paramtable.Get().Init(paramtable.NewBaseTable())
}
func (s *MetaWriterSuite) SetupTest() {
s.broker = broker.NewMockBroker(s.T())
s.metacache = metacache.NewMockMetaCache(s.T())
s.writer = BrokerMetaWriter(s.broker, retry.Attempts(1))
}
func (s *MetaWriterSuite) TestNormalSave() {
s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil)
bfs := metacache.NewBloomFilterSet()
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs)
metacache.UpdateNumOfRows(1000)(seg)
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg})
task := NewSyncTask()
task.WithMetaCache(s.metacache)
err := s.writer.UpdateSync(task)
s.NoError(err)
}
func (s *MetaWriterSuite) TestReturnError() {
s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(errors.New("mocked"))
bfs := metacache.NewBloomFilterSet()
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs)
metacache.UpdateNumOfRows(1000)(seg)
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg})
task := NewSyncTask()
task.WithMetaCache(s.metacache)
err := s.writer.UpdateSync(task)
s.Error(err)
}
func TestMetaWriter(t *testing.T) {
suite.Run(t, new(MetaWriterSuite))
}

View File

@ -0,0 +1,101 @@
package syncmgr
import (
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
func NewSyncTask() *SyncTask {
return &SyncTask{
isFlush: false,
insertBinlogs: make(map[int64]*datapb.Binlog),
statsBinlogs: make(map[int64]*datapb.Binlog),
segmentData: make(map[string][]byte),
}
}
func (t *SyncTask) WithChunkManager(cm storage.ChunkManager) *SyncTask {
t.chunkManager = cm
return t
}
func (t *SyncTask) WithAllocator(allocator allocator.Interface) *SyncTask {
t.allocator = allocator
return t
}
func (t *SyncTask) WithInsertData(insertData *storage.InsertData) *SyncTask {
t.insertData = insertData
return t
}
func (t *SyncTask) WithDeleteData(deleteData *storage.DeleteData) *SyncTask {
t.deleteData = deleteData
return t
}
func (t *SyncTask) WithCheckpoint(cp *msgpb.MsgPosition) *SyncTask {
t.checkpoint = cp
return t
}
func (t *SyncTask) WithCollectionID(collID int64) *SyncTask {
t.collectionID = collID
return t
}
func (t *SyncTask) WithPartitionID(partID int64) *SyncTask {
t.partitionID = partID
return t
}
func (t *SyncTask) WithSegmentID(segID int64) *SyncTask {
t.segmentID = segID
return t
}
func (t *SyncTask) WithChannelName(chanName string) *SyncTask {
t.channelName = chanName
return t
}
func (t *SyncTask) WithSchema(schema *schemapb.CollectionSchema) *SyncTask {
t.schema = schema
return t
}
func (t *SyncTask) WithTimeRange(from, to typeutil.Timestamp) *SyncTask {
t.tsFrom, t.tsTo = from, to
return t
}
func (t *SyncTask) WithFlush() *SyncTask {
t.isFlush = true
return t
}
func (t *SyncTask) WithMetaCache(metacache metacache.MetaCache) *SyncTask {
t.metacache = metacache
return t
}
func (t *SyncTask) WithMetaWriter(metaWriter MetaWriter) *SyncTask {
t.metaWriter = metaWriter
return t
}
func (t *SyncTask) WithWriteRetryOptions(opts ...retry.Option) *SyncTask {
t.writeRetryOpts = opts
return t
}
func (t *SyncTask) WithFailureCallback(callback func(error)) *SyncTask {
t.failureCallback = callback
return t
}

View File

@ -0,0 +1,64 @@
package syncmgr
import (
"context"
"strconv"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type SyncManagerOption struct {
chunkManager storage.ChunkManager
allocator allocator.Interface
parallelTask int
}
type SyncMeta struct {
collectionID int64
partitionID int64
segmentID int64
channelName string
schema *schemapb.CollectionSchema
checkpoint *msgpb.MsgPosition
tsFrom typeutil.Timestamp
tsTo typeutil.Timestamp
metacache metacache.MetaCache
}
type SyncManager interface {
SyncData(ctx context.Context, task *SyncTask) error
}
type syncManager struct {
*keyLockDispatcher[int64]
chunkManager storage.ChunkManager
allocator allocator.Interface
}
func NewSyncManager(parallelTask int, chunkManager storage.ChunkManager, allocator allocator.Interface) (SyncManager, error) {
if parallelTask < 1 {
return nil, merr.WrapErrParameterInvalid("positive parallel task number", strconv.FormatInt(int64(parallelTask), 10))
}
return &syncManager{
keyLockDispatcher: newKeyLockDispatcher[int64](parallelTask),
chunkManager: chunkManager,
allocator: allocator,
}, nil
}
func (mgr syncManager) SyncData(ctx context.Context, task *SyncTask) error {
task.WithAllocator(mgr.allocator).WithChunkManager(mgr.chunkManager)
// make sync for same segment execute in sequence
// if previous sync task is not finished, block here
mgr.Submit(task.segmentID, task)
return nil
}

View File

@ -0,0 +1,177 @@
package syncmgr
import (
"context"
"math/rand"
"testing"
"time"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
type SyncManagerSuite struct {
suite.Suite
collectionID int64
partitionID int64
segmentID int64
channelName string
metacache *metacache.MockMetaCache
allocator *allocator.MockGIDAllocator
schema *schemapb.CollectionSchema
chunkManager *mocks.ChunkManager
broker *broker.MockBroker
}
func (s *SyncManagerSuite) SetupSuite() {
paramtable.Get().Init(paramtable.NewBaseTable())
s.collectionID = 100
s.partitionID = 101
s.segmentID = 1001
s.channelName = "by-dev-rootcoord-dml_0_100v0"
s.schema = &schemapb.CollectionSchema{
Name: "sync_task_test_col",
Fields: []*schemapb.FieldSchema{
{FieldID: common.RowIDField, DataType: schemapb.DataType_Int64},
{FieldID: common.TimeStampField, DataType: schemapb.DataType_Int64},
{
FieldID: 100,
Name: "pk",
DataType: schemapb.DataType_Int64,
IsPrimaryKey: true,
},
{
FieldID: 101,
Name: "vector",
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
}
}
func (s *SyncManagerSuite) SetupTest() {
s.allocator = allocator.NewMockGIDAllocator()
s.allocator.AllocF = func(count uint32) (int64, int64, error) {
return time.Now().Unix(), int64(count), nil
}
s.allocator.AllocOneF = func() (allocator.UniqueID, error) {
return time.Now().Unix(), nil
}
s.chunkManager = mocks.NewChunkManager(s.T())
s.chunkManager.EXPECT().RootPath().Return("files").Maybe()
s.chunkManager.EXPECT().MultiWrite(mock.Anything, mock.Anything).Return(nil).Maybe()
s.broker = broker.NewMockBroker(s.T())
s.metacache = metacache.NewMockMetaCache(s.T())
}
func (s *SyncManagerSuite) getEmptyInsertBuffer() *storage.InsertData {
buf, err := storage.NewInsertData(s.schema)
s.Require().NoError(err)
return buf
}
func (s *SyncManagerSuite) getInsertBuffer() *storage.InsertData {
buf := s.getEmptyInsertBuffer()
// generate data
for i := 0; i < 10; i++ {
data := make(map[storage.FieldID]any)
data[common.RowIDField] = int64(i + 1)
data[common.TimeStampField] = int64(i + 1)
data[100] = int64(i + 1)
vector := lo.RepeatBy(128, func(_ int) float32 {
return rand.Float32()
})
data[101] = vector
err := buf.Append(data)
s.Require().NoError(err)
}
return buf
}
func (s *SyncManagerSuite) getDeleteBuffer() *storage.DeleteData {
buf := &storage.DeleteData{}
for i := 0; i < 10; i++ {
pk := storage.NewInt64PrimaryKey(int64(i + 1))
ts := tsoutil.ComposeTSByTime(time.Now(), 0)
buf.Append(pk, ts)
}
return buf
}
func (s *SyncManagerSuite) getDeleteBufferZeroTs() *storage.DeleteData {
buf := &storage.DeleteData{}
for i := 0; i < 10; i++ {
pk := storage.NewInt64PrimaryKey(int64(i + 1))
buf.Append(pk, 0)
}
return buf
}
func (s *SyncManagerSuite) getSuiteSyncTask() *SyncTask {
task := NewSyncTask().WithCollectionID(s.collectionID).
WithPartitionID(s.partitionID).
WithSegmentID(s.segmentID).
WithChannelName(s.channelName).
WithSchema(s.schema).
WithChunkManager(s.chunkManager).
WithAllocator(s.allocator).
WithMetaCache(s.metacache)
return task
}
func (s *SyncManagerSuite) TestSubmit() {
sig := make(chan struct{})
s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Run(func(_ context.Context, _ *datapb.SaveBinlogPathsRequest) {
close(sig)
}).Return(nil)
bfs := metacache.NewBloomFilterSet()
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs)
metacache.UpdateNumOfRows(1000)(seg)
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg})
manager, err := NewSyncManager(10, s.chunkManager, s.allocator)
s.NoError(err)
task := s.getSuiteSyncTask()
task.WithMetaWriter(BrokerMetaWriter(s.broker))
task.WithTimeRange(50, 100)
task.WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
})
err = manager.SyncData(context.Background(), task)
s.NoError(err)
<-sig
}
func TestSyncManager(t *testing.T) {
suite.Run(t, new(SyncManagerSuite))
}

View File

@ -0,0 +1,319 @@
package syncmgr
import (
"context"
"path"
"strconv"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type SyncTask struct {
chunkManager storage.ChunkManager
allocator allocator.Interface
insertData *storage.InsertData
deleteData *storage.DeleteData
collectionID int64
partitionID int64
segmentID int64
channelName string
schema *schemapb.CollectionSchema
checkpoint *msgpb.MsgPosition
tsFrom typeutil.Timestamp
tsTo typeutil.Timestamp
isFlush bool
metacache metacache.MetaCache
metaWriter MetaWriter
insertBinlogs map[int64]*datapb.Binlog
statsBinlogs map[int64]*datapb.Binlog
deltaBinlog *datapb.Binlog
segmentData map[string][]byte
writeRetryOpts []retry.Option
failureCallback func(err error)
}
func (t *SyncTask) getLogger() *log.MLogger {
return log.Ctx(context.Background()).With(
zap.Int64("collectionID", t.collectionID),
zap.Int64("partitionID", t.partitionID),
zap.Int64("segmentID", t.segmentID),
zap.String("channel", t.channelName),
)
}
func (t *SyncTask) handleError(err error) {
if t.failureCallback != nil {
t.failureCallback(err)
}
}
func (t *SyncTask) Run() error {
log := t.getLogger()
var err error
err = t.serializeInsertData()
if err != nil {
log.Warn("failed to serialize insert data", zap.Error(err))
t.handleError(err)
return err
}
err = t.serializeDeleteData()
if err != nil {
log.Warn("failed to serialize delete data", zap.Error(err))
t.handleError(err)
return err
}
err = t.writeLogs()
if err != nil {
log.Warn("failed to save serialized data into storage", zap.Error(err))
t.handleError(err)
return err
}
if t.metaWriter != nil {
err = t.writeMeta()
if err != nil {
log.Warn("failed to save serialized data into storage", zap.Error(err))
t.handleError(err)
return err
}
}
log.Warn("task done")
return nil
}
func (t *SyncTask) serializeInsertData() error {
err := t.serializeBinlog()
if err != nil {
return err
}
err = t.serializePkStatsLog()
if err != nil {
return err
}
return nil
}
func (t *SyncTask) serializeDeleteData() error {
if t.deleteData == nil {
return nil
}
delCodec := storage.NewDeleteCodec()
blob, err := delCodec.Serialize(t.collectionID, t.partitionID, t.segmentID, t.deleteData)
if err != nil {
return err
}
logID, err := t.allocator.AllocOne()
if err != nil {
log.Error("failed to alloc ID", zap.Error(err))
return err
}
value := blob.GetValue()
data := &datapb.Binlog{}
blobKey := metautil.JoinIDPath(t.collectionID, t.partitionID, t.segmentID, logID)
blobPath := path.Join(t.chunkManager.RootPath(), common.SegmentDeltaLogPath, blobKey)
t.segmentData[blobPath] = value
data.LogSize = int64(len(blob.Value))
data.LogPath = blobPath
t.deltaBinlog = data
return nil
}
func (t *SyncTask) serializeBinlog() error {
if t.insertData == nil {
return nil
}
// get memory size of buffer data
memSize := make(map[int64]int)
for fieldID, fieldData := range t.insertData.Data {
memSize[fieldID] = fieldData.GetMemorySize()
}
inCodec := t.getInCodec()
blobs, err := inCodec.Serialize(t.partitionID, t.segmentID, t.insertData)
if err != nil {
return err
}
logidx, _, err := t.allocator.Alloc(uint32(len(blobs)))
if err != nil {
return err
}
for _, blob := range blobs {
fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64)
if err != nil {
log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err))
return err
}
k := metautil.JoinIDPath(t.collectionID, t.partitionID, t.segmentID, fieldID, logidx)
// [rootPath]/[insert_log]/key
key := path.Join(t.chunkManager.RootPath(), common.SegmentInsertLogPath, k)
t.segmentData[key] = blob.GetValue()
t.insertBinlogs[fieldID] = &datapb.Binlog{
EntriesNum: blob.RowNum,
TimestampFrom: t.tsFrom,
TimestampTo: t.tsTo,
LogPath: key,
LogSize: int64(memSize[fieldID]),
}
logidx += 1
}
return nil
}
func (t *SyncTask) convertInsertData2PkStats(pkFieldID int64, dataType schemapb.DataType) (*storage.PrimaryKeyStats, int64) {
pkFieldData := t.insertData.Data[pkFieldID]
rowNum := int64(pkFieldData.RowNum())
if rowNum == 0 {
return nil, 0
}
stats := storage.NewPrimaryKeyStats(pkFieldID, int64(dataType), rowNum)
stats.UpdateByMsgs(pkFieldData)
return stats, rowNum
}
func (t *SyncTask) serializeSinglePkStats(fieldID int64, stats *storage.PrimaryKeyStats, rowNum int64) error {
blob, err := t.getInCodec().SerializePkStats(stats, rowNum)
if err != nil {
return err
}
logidx, err := t.allocator.AllocOne()
if err != nil {
return err
}
t.convertBlob2StatsBinlog(blob, fieldID, logidx, rowNum)
return nil
}
func (t *SyncTask) serializeMergedPkStats(fieldID int64, stats *storage.PrimaryKeyStats, rowNum int64) error {
segments := t.metacache.GetSegmentsBy(metacache.WithSegmentID(t.segmentID))
var statsList []*storage.PrimaryKeyStats
var oldRowNum int64
for _, segment := range segments {
oldRowNum += segment.NumOfRows()
statsList = append(statsList, lo.Map(segment.GetHistory(), func(pks *storage.PkStatistics, _ int) *storage.PrimaryKeyStats {
return &storage.PrimaryKeyStats{
FieldID: fieldID,
MaxPk: pks.MaxPK,
MinPk: pks.MinPK,
BF: pks.PkFilter,
}
})...)
}
if stats != nil {
statsList = append(statsList, stats)
}
blob, err := t.getInCodec().SerializePkStatsList(statsList, oldRowNum+rowNum)
if err != nil {
return err
}
t.convertBlob2StatsBinlog(blob, fieldID, int64(storage.CompoundStatsType), oldRowNum+rowNum)
return nil
}
func (t *SyncTask) convertBlob2StatsBinlog(blob *storage.Blob, fieldID, logID int64, rowNum int64) {
key := metautil.JoinIDPath(t.collectionID, t.partitionID, t.segmentID, fieldID, logID)
key = path.Join(t.chunkManager.RootPath(), common.SegmentStatslogPath, key)
value := blob.GetValue()
t.segmentData[key] = value
t.statsBinlogs[fieldID] = &datapb.Binlog{
EntriesNum: rowNum,
TimestampFrom: t.tsFrom,
TimestampTo: t.tsTo,
LogPath: key,
LogSize: int64(len(value)),
}
}
func (t *SyncTask) serializePkStatsLog() error {
if t.insertData == nil {
return nil
}
pkField := lo.FindOrElse(t.schema.GetFields(), nil, func(field *schemapb.FieldSchema) bool { return field.GetIsPrimaryKey() })
if pkField == nil {
return merr.WrapErrServiceInternal("cannot find pk field")
}
fieldID := pkField.GetFieldID()
stats, rowNum := t.convertInsertData2PkStats(fieldID, pkField.GetDataType())
// not flush and not insert data
if !t.isFlush && stats == nil {
return nil
}
if t.isFlush {
return t.serializeMergedPkStats(fieldID, stats, rowNum)
}
return t.serializeSinglePkStats(fieldID, stats, rowNum)
}
// writeLogs writes log files (binlog/deltalog/statslog) into storage via chunkManger.
func (t *SyncTask) writeLogs() error {
return retry.Do(context.Background(), func() error {
return t.chunkManager.MultiWrite(context.Background(), t.segmentData)
}, t.writeRetryOpts...)
}
// writeMeta updates segments via meta writer in option.
func (t *SyncTask) writeMeta() error {
return t.metaWriter.UpdateSync(t)
}
func (t *SyncTask) getInCodec() *storage.InsertCodec {
meta := &etcdpb.CollectionMeta{
Schema: t.schema,
ID: t.collectionID,
}
return storage.NewInsertCodecWithSchema(meta)
}

View File

@ -0,0 +1,262 @@
package syncmgr
import (
"math/rand"
"testing"
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
type SyncTaskSuite struct {
suite.Suite
collectionID int64
partitionID int64
segmentID int64
channelName string
metacache *metacache.MockMetaCache
allocator *allocator.MockGIDAllocator
schema *schemapb.CollectionSchema
chunkManager *mocks.ChunkManager
broker *broker.MockBroker
}
func (s *SyncTaskSuite) SetupSuite() {
paramtable.Get().Init(paramtable.NewBaseTable())
s.collectionID = 100
s.partitionID = 101
s.segmentID = 1001
s.channelName = "by-dev-rootcoord-dml_0_100v0"
s.schema = &schemapb.CollectionSchema{
Name: "sync_task_test_col",
Fields: []*schemapb.FieldSchema{
{FieldID: common.RowIDField, DataType: schemapb.DataType_Int64},
{FieldID: common.TimeStampField, DataType: schemapb.DataType_Int64},
{
FieldID: 100,
Name: "pk",
DataType: schemapb.DataType_Int64,
IsPrimaryKey: true,
},
{
FieldID: 101,
Name: "vector",
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
}
}
func (s *SyncTaskSuite) SetupTest() {
s.allocator = allocator.NewMockGIDAllocator()
s.allocator.AllocF = func(count uint32) (int64, int64, error) {
return time.Now().Unix(), int64(count), nil
}
s.allocator.AllocOneF = func() (allocator.UniqueID, error) {
return time.Now().Unix(), nil
}
s.chunkManager = mocks.NewChunkManager(s.T())
s.chunkManager.EXPECT().RootPath().Return("files").Maybe()
s.chunkManager.EXPECT().MultiWrite(mock.Anything, mock.Anything).Return(nil).Maybe()
s.broker = broker.NewMockBroker(s.T())
s.metacache = metacache.NewMockMetaCache(s.T())
}
func (s *SyncTaskSuite) getEmptyInsertBuffer() *storage.InsertData {
buf, err := storage.NewInsertData(s.schema)
s.Require().NoError(err)
return buf
}
func (s *SyncTaskSuite) getInsertBuffer() *storage.InsertData {
buf := s.getEmptyInsertBuffer()
// generate data
for i := 0; i < 10; i++ {
data := make(map[storage.FieldID]any)
data[common.RowIDField] = int64(i + 1)
data[common.TimeStampField] = int64(i + 1)
data[100] = int64(i + 1)
vector := lo.RepeatBy(128, func(_ int) float32 {
return rand.Float32()
})
data[101] = vector
err := buf.Append(data)
s.Require().NoError(err)
}
return buf
}
func (s *SyncTaskSuite) getDeleteBuffer() *storage.DeleteData {
buf := &storage.DeleteData{}
for i := 0; i < 10; i++ {
pk := storage.NewInt64PrimaryKey(int64(i + 1))
ts := tsoutil.ComposeTSByTime(time.Now(), 0)
buf.Append(pk, ts)
}
return buf
}
func (s *SyncTaskSuite) getDeleteBufferZeroTs() *storage.DeleteData {
buf := &storage.DeleteData{}
for i := 0; i < 10; i++ {
pk := storage.NewInt64PrimaryKey(int64(i + 1))
buf.Append(pk, 0)
}
return buf
}
func (s *SyncTaskSuite) getSuiteSyncTask() *SyncTask {
task := NewSyncTask().WithCollectionID(s.collectionID).
WithPartitionID(s.partitionID).
WithSegmentID(s.segmentID).
WithChannelName(s.channelName).
WithSchema(s.schema).
WithChunkManager(s.chunkManager).
WithAllocator(s.allocator).
WithMetaCache(s.metacache)
return task
}
func (s *SyncTaskSuite) TestRunNormal() {
s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil)
bfs := metacache.NewBloomFilterSet()
fd, err := storage.NewFieldData(schemapb.DataType_Int64, &schemapb.FieldSchema{
FieldID: 101,
Name: "ID",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
})
s.Require().NoError(err)
ids := []int64{1, 2, 3, 4, 5, 6, 7}
for _, id := range ids {
err = fd.AppendRow(id)
s.Require().NoError(err)
}
bfs.UpdatePKRange(fd)
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs)
metacache.UpdateNumOfRows(1000)(seg)
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.Run("without_insert_delete", func() {
task := s.getSuiteSyncTask()
task.WithMetaWriter(BrokerMetaWriter(s.broker))
task.WithTimeRange(50, 100)
task.WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
})
err := task.Run()
s.NoError(err)
})
s.Run("with_insert_delete_cp", func() {
task := s.getSuiteSyncTask()
task.WithInsertData(s.getInsertBuffer()).WithDeleteData(s.getDeleteBuffer())
task.WithTimeRange(50, 100)
task.WithMetaWriter(BrokerMetaWriter(s.broker))
task.WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
})
err := task.Run()
s.NoError(err)
})
s.Run("with_insert_delete_flush", func() {
task := s.getSuiteSyncTask()
task.WithInsertData(s.getInsertBuffer()).WithDeleteData(s.getDeleteBuffer())
task.WithFlush()
task.WithMetaWriter(BrokerMetaWriter(s.broker))
task.WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
})
err := task.Run()
s.NoError(err)
})
}
func (s *SyncTaskSuite) TestRunError() {
s.Run("serialize_insert_fail", func() {
flag := false
handler := func(_ error) { flag = true }
task := s.getSuiteSyncTask().WithFailureCallback(handler)
task.WithInsertData(s.getEmptyInsertBuffer())
err := task.Run()
s.Error(err)
s.True(flag)
})
s.Run("serailize_delete_fail", func() {
flag := false
handler := func(_ error) { flag = true }
task := s.getSuiteSyncTask().WithFailureCallback(handler)
task.WithDeleteData(s.getDeleteBufferZeroTs())
err := task.Run()
s.Error(err)
s.True(flag)
})
s.Run("chunk_manager_save_fail", func() {
flag := false
handler := func(_ error) { flag = true }
s.chunkManager.ExpectedCalls = nil
s.chunkManager.EXPECT().RootPath().Return("files")
s.chunkManager.EXPECT().MultiWrite(mock.Anything, mock.Anything).Return(errors.New("mocked"))
task := s.getSuiteSyncTask().WithFailureCallback(handler)
task.WithInsertData(s.getInsertBuffer()).WithDeleteData(s.getDeleteBuffer())
task.WithWriteRetryOptions(retry.Attempts(1))
err := task.Run()
s.Error(err)
s.True(flag)
})
}
func TestSyncTask(t *testing.T) {
suite.Run(t, new(SyncTaskSuite))
}