feat: trigger compaction to handle index version (#28442)

issue: https://github.com/milvus-io/milvus/issues/28441

---------

Signed-off-by: Enwei Jiao <enwei.jiao@zilliz.com>
pull/28424/head
Enwei Jiao 2023-11-21 09:26:22 +08:00 committed by GitHub
parent e88bbaac24
commit 7445d3711c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 403 additions and 49 deletions

View File

@ -426,6 +426,7 @@ generate-mockery-datacoord: getdeps
$(INSTALL_PATH)/mockery --name=Handler --dir=internal/datacoord --filename=mock_handler.go --output=internal/datacoord --structname=NMockHandler --with-expecter --inpackage
$(INSTALL_PATH)/mockery --name=allocator --dir=internal/datacoord --filename=mock_allocator_test.go --output=internal/datacoord --structname=NMockAllocator --with-expecter --inpackage
$(INSTALL_PATH)/mockery --name=RWChannelStore --dir=internal/datacoord --filename=mock_channel_store.go --output=internal/datacoord --structname=MockRWChannelStore --with-expecter --inpackage
$(INSTALL_PATH)/mockery --name=IndexEngineVersionManager --dir=internal/datacoord --filename=mock_index_engine_version_manager.go --output=internal/datacoord --structname=MockVersionManager --with-expecter --inpackage
generate-mockery-datanode: getdeps
$(INSTALL_PATH)/mockery --name=Allocator --dir=$(PWD)/internal/datanode/allocator --output=$(PWD)/internal/datanode/allocator --filename=mock_allocator.go --with-expecter --structname=MockAllocator --outpkg=allocator --inpackage

View File

@ -72,8 +72,9 @@ type compactionTrigger struct {
forceMu sync.Mutex
quit chan struct{}
wg sync.WaitGroup
// segRefer *SegmentReferenceManager
// indexCoord types.IndexCoord
indexEngineVersionManager IndexEngineVersionManager
estimateNonDiskSegmentPolicy calUpperLimitPolicy
estimateDiskSegmentPolicy calUpperLimitPolicy
// A sloopy hack, so we can test with different segment row count without worrying that
@ -85,17 +86,15 @@ func newCompactionTrigger(
meta *meta,
compactionHandler compactionPlanContext,
allocator allocator,
// segRefer *SegmentReferenceManager,
// indexCoord types.IndexCoord,
handler Handler,
indexVersionManager IndexEngineVersionManager,
) *compactionTrigger {
return &compactionTrigger{
meta: meta,
allocator: allocator,
signals: make(chan *compactionSignal, 100),
compactionHandler: compactionHandler,
// segRefer: segRefer,
// indexCoord: indexCoord,
meta: meta,
allocator: allocator,
signals: make(chan *compactionSignal, 100),
compactionHandler: compactionHandler,
indexEngineVersionManager: indexVersionManager,
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy,
handler: handler,
@ -908,6 +907,20 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, isDis
return true
}
// index version of segment lower than current version and IndexFileKeys should have value, trigger compaction
for _, index := range segment.segmentIndexes {
if index.CurrentIndexVersion < t.indexEngineVersionManager.GetCurrentIndexEngineVersion() &&
len(index.IndexFileKeys) > 0 {
log.Info("index version is too old, trigger compaction",
zap.Int64("segmentID", segment.ID),
zap.Int64("indexID", index.IndexID),
zap.Strings("indexFileKeys", index.IndexFileKeys),
zap.Int32("currentIndexVersion", index.CurrentIndexVersion),
zap.Int32("currentEngineVersion", t.indexEngineVersionManager.GetCurrentIndexEngineVersion()))
return true
}
}
return false
}

View File

@ -73,6 +73,10 @@ func (h *spyCompactionHandler) start() {}
func (h *spyCompactionHandler) stop() {}
func newMockVersionManager() IndexEngineVersionManager {
return &versionManagerImpl{}
}
var _ compactionPlanContext = (*spyCompactionHandler)(nil)
func Test_compactionTrigger_force(t *testing.T) {
@ -1294,6 +1298,7 @@ func Test_compactionTrigger_SmallCandi(t *testing.T) {
signals: tt.fields.signals,
compactionHandler: tt.fields.compactionHandler,
globalTrigger: tt.fields.globalTrigger,
indexEngineVersionManager: newMockVersionManager(),
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy,
testingOnly: true,
@ -1471,6 +1476,7 @@ func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) {
signals: tt.fields.signals,
compactionHandler: tt.fields.compactionHandler,
globalTrigger: tt.fields.globalTrigger,
indexEngineVersionManager: newMockVersionManager(),
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy,
testingOnly: true,
@ -1629,13 +1635,14 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tr := &compactionTrigger{
meta: tt.fields.meta,
handler: newMockHandlerWithMeta(tt.fields.meta),
allocator: tt.fields.allocator,
signals: tt.fields.signals,
compactionHandler: tt.fields.compactionHandler,
globalTrigger: tt.fields.globalTrigger,
testingOnly: true,
meta: tt.fields.meta,
handler: newMockHandlerWithMeta(tt.fields.meta),
allocator: tt.fields.allocator,
signals: tt.fields.signals,
compactionHandler: tt.fields.compactionHandler,
globalTrigger: tt.fields.globalTrigger,
indexEngineVersionManager: newMockVersionManager(),
testingOnly: true,
}
tr.start()
defer tr.stop()
@ -1678,7 +1685,7 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) {
// Test shouldDoSingleCompaction
func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) {
trigger := newCompactionTrigger(&meta{}, &compactionPlanHandler{}, newMockAllocator(), newMockHandler())
trigger := newCompactionTrigger(&meta{}, &compactionPlanHandler{}, newMockAllocator(), newMockHandler(), newIndexEngineVersionManager())
// Test too many deltalogs.
var binlogs []*datapb.FieldBinlog
@ -1816,6 +1823,77 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) {
// deltalog is large enough, should do compaction
couldDo = trigger.ShouldDoSingleCompaction(info3, false, &compactTime{})
assert.True(t, couldDo)
mockVersionManager := NewMockVersionManager(t)
mockVersionManager.On("GetCurrentIndexEngineVersion", mock.Anything).Return(int32(2), nil)
trigger.indexEngineVersionManager = mockVersionManager
info4 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 600,
NumOfRows: 10000,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: binlogs2,
},
segmentIndexes: map[UniqueID]*model.SegmentIndex{
101: {
CurrentIndexVersion: 1,
IndexFileKeys: []string{"index1"},
},
},
}
info5 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 600,
NumOfRows: 10000,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: binlogs2,
},
segmentIndexes: map[UniqueID]*model.SegmentIndex{
101: {
CurrentIndexVersion: 2,
IndexFileKeys: []string{"index1"},
},
},
}
info6 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 600,
NumOfRows: 10000,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: binlogs2,
},
segmentIndexes: map[UniqueID]*model.SegmentIndex{
101: {
CurrentIndexVersion: 1,
IndexFileKeys: nil,
},
},
}
// expire time < Timestamp To, but index engine version is 2 which is larger than CurrentIndexVersion in segmentIndex
couldDo = trigger.ShouldDoSingleCompaction(info4, false, &compactTime{expireTime: 300})
assert.True(t, couldDo)
// expire time < Timestamp To, and index engine version is 2 which is equal CurrentIndexVersion in segmentIndex
couldDo = trigger.ShouldDoSingleCompaction(info5, false, &compactTime{expireTime: 300})
assert.False(t, couldDo)
// expire time < Timestamp To, and index engine version is 2 which is larger than CurrentIndexVersion in segmentIndex but indexFileKeys is nil
couldDo = trigger.ShouldDoSingleCompaction(info6, false, &compactTime{expireTime: 300})
assert.False(t, couldDo)
}
func Test_compactionTrigger_new(t *testing.T) {
@ -1839,7 +1917,7 @@ func Test_compactionTrigger_new(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := newCompactionTrigger(tt.args.meta, tt.args.compactionHandler, tt.args.allocator, newMockHandler())
got := newCompactionTrigger(tt.args.meta, tt.args.compactionHandler, tt.args.allocator, newMockHandler(), newMockVersionManager())
assert.Equal(t, tt.args.meta, got.meta)
assert.Equal(t, tt.args.compactionHandler, got.compactionHandler)
assert.Equal(t, tt.args.allocator, got.allocator)
@ -1848,7 +1926,7 @@ func Test_compactionTrigger_new(t *testing.T) {
}
func Test_compactionTrigger_handleSignal(t *testing.T) {
got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{scheduler: newScheduler()}, newMockAllocator(), newMockHandler())
got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{scheduler: newScheduler()}, newMockAllocator(), newMockHandler(), newMockVersionManager())
signal := &compactionSignal{
segmentID: 1,
}
@ -1858,12 +1936,12 @@ func Test_compactionTrigger_handleSignal(t *testing.T) {
}
func Test_compactionTrigger_allocTs(t *testing.T) {
got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{scheduler: newScheduler()}, newMockAllocator(), newMockHandler())
got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{scheduler: newScheduler()}, newMockAllocator(), newMockHandler(), newMockVersionManager())
ts, err := got.allocTs()
assert.NoError(t, err)
assert.True(t, ts > 0)
got = newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{scheduler: newScheduler()}, &FailsAllocator{}, newMockHandler())
got = newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{scheduler: newScheduler()}, &FailsAllocator{}, newMockHandler(), newMockVersionManager())
ts, err = got.allocTs()
assert.Error(t, err)
assert.Equal(t, uint64(0), ts)
@ -1895,7 +1973,7 @@ func Test_compactionTrigger_getCompactTime(t *testing.T) {
&Server{
meta: m,
},
})
}, newMockVersionManager())
coll := &collectionInfo{
ID: 1,
Schema: newTestSchema(),
@ -1925,6 +2003,7 @@ type CompactionTriggerSuite struct {
allocator *NMockAllocator
handler *NMockHandler
compactionHandler *MockCompactionPlanContext
versionManager *MockVersionManager
}
func (s *CompactionTriggerSuite) SetupSuite() {
@ -2046,11 +2125,13 @@ func (s *CompactionTriggerSuite) SetupTest() {
s.allocator = NewNMockAllocator(s.T())
s.compactionHandler = NewMockCompactionPlanContext(s.T())
s.handler = NewNMockHandler(s.T())
s.versionManager = NewMockVersionManager(s.T())
s.tr = newCompactionTrigger(
s.meta,
s.compactionHandler,
s.allocator,
s.handler,
s.versionManager,
)
s.tr.testingOnly = true
}

View File

@ -80,14 +80,14 @@ type indexBuilder struct {
policy buildIndexPolicy
nodeManager *IndexNodeManager
chunkManager storage.ChunkManager
indexEngineVersionManager *IndexEngineVersionManager
indexEngineVersionManager IndexEngineVersionManager
}
func newIndexBuilder(
ctx context.Context,
metaTable *meta, nodeManager *IndexNodeManager,
chunkManager storage.ChunkManager,
indexEngineVersionManager *IndexEngineVersionManager,
indexEngineVersionManager IndexEngineVersionManager,
) *indexBuilder {
ctx, cancel := context.WithCancel(ctx)

View File

@ -10,18 +10,28 @@ import (
"github.com/milvus-io/milvus/pkg/log"
)
type IndexEngineVersionManager struct {
type IndexEngineVersionManager interface {
Startup(sessions map[string]*sessionutil.Session)
AddNode(session *sessionutil.Session)
RemoveNode(session *sessionutil.Session)
Update(session *sessionutil.Session)
GetCurrentIndexEngineVersion() int32
GetMinimalIndexEngineVersion() int32
}
type versionManagerImpl struct {
mu sync.Mutex
versions map[int64]sessionutil.IndexEngineVersion
}
func newIndexEngineVersionManager() *IndexEngineVersionManager {
return &IndexEngineVersionManager{
func newIndexEngineVersionManager() IndexEngineVersionManager {
return &versionManagerImpl{
versions: map[int64]sessionutil.IndexEngineVersion{},
}
}
func (m *IndexEngineVersionManager) Startup(sessions map[string]*sessionutil.Session) {
func (m *versionManagerImpl) Startup(sessions map[string]*sessionutil.Session) {
m.mu.Lock()
defer m.mu.Unlock()
@ -30,33 +40,33 @@ func (m *IndexEngineVersionManager) Startup(sessions map[string]*sessionutil.Ses
}
}
func (m *IndexEngineVersionManager) AddNode(session *sessionutil.Session) {
func (m *versionManagerImpl) AddNode(session *sessionutil.Session) {
m.mu.Lock()
defer m.mu.Unlock()
m.addOrUpdate(session)
}
func (m *IndexEngineVersionManager) RemoveNode(session *sessionutil.Session) {
func (m *versionManagerImpl) RemoveNode(session *sessionutil.Session) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.versions, session.ServerID)
}
func (m *IndexEngineVersionManager) Update(session *sessionutil.Session) {
func (m *versionManagerImpl) Update(session *sessionutil.Session) {
m.mu.Lock()
defer m.mu.Unlock()
m.addOrUpdate(session)
}
func (m *IndexEngineVersionManager) addOrUpdate(session *sessionutil.Session) {
func (m *versionManagerImpl) addOrUpdate(session *sessionutil.Session) {
log.Info("addOrUpdate version", zap.Int64("nodeId", session.ServerID), zap.Int32("minimal", session.IndexEngineVersion.MinimalIndexVersion), zap.Int32("current", session.IndexEngineVersion.CurrentIndexVersion))
m.versions[session.ServerID] = session.IndexEngineVersion
}
func (m *IndexEngineVersionManager) GetCurrentIndexEngineVersion() int32 {
func (m *versionManagerImpl) GetCurrentIndexEngineVersion() int32 {
m.mu.Lock()
defer m.mu.Unlock()
@ -75,7 +85,7 @@ func (m *IndexEngineVersionManager) GetCurrentIndexEngineVersion() int32 {
return current
}
func (m *IndexEngineVersionManager) GetMinimalIndexEngineVersion() int32 {
func (m *versionManagerImpl) GetMinimalIndexEngineVersion() int32 {
m.mu.Lock()
defer m.mu.Unlock()

View File

@ -0,0 +1,249 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
package datacoord
import (
sessionutil "github.com/milvus-io/milvus/internal/util/sessionutil"
mock "github.com/stretchr/testify/mock"
)
// MockVersionManager is an autogenerated mock type for the IndexEngineVersionManager type
type MockVersionManager struct {
mock.Mock
}
type MockVersionManager_Expecter struct {
mock *mock.Mock
}
func (_m *MockVersionManager) EXPECT() *MockVersionManager_Expecter {
return &MockVersionManager_Expecter{mock: &_m.Mock}
}
// AddNode provides a mock function with given fields: session
func (_m *MockVersionManager) AddNode(session *sessionutil.Session) {
_m.Called(session)
}
// MockVersionManager_AddNode_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddNode'
type MockVersionManager_AddNode_Call struct {
*mock.Call
}
// AddNode is a helper method to define mock.On call
// - session *sessionutil.Session
func (_e *MockVersionManager_Expecter) AddNode(session interface{}) *MockVersionManager_AddNode_Call {
return &MockVersionManager_AddNode_Call{Call: _e.mock.On("AddNode", session)}
}
func (_c *MockVersionManager_AddNode_Call) Run(run func(session *sessionutil.Session)) *MockVersionManager_AddNode_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*sessionutil.Session))
})
return _c
}
func (_c *MockVersionManager_AddNode_Call) Return() *MockVersionManager_AddNode_Call {
_c.Call.Return()
return _c
}
func (_c *MockVersionManager_AddNode_Call) RunAndReturn(run func(*sessionutil.Session)) *MockVersionManager_AddNode_Call {
_c.Call.Return(run)
return _c
}
// GetCurrentIndexEngineVersion provides a mock function with given fields:
func (_m *MockVersionManager) GetCurrentIndexEngineVersion() int32 {
ret := _m.Called()
var r0 int32
if rf, ok := ret.Get(0).(func() int32); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int32)
}
return r0
}
// MockVersionManager_GetCurrentIndexEngineVersion_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCurrentIndexEngineVersion'
type MockVersionManager_GetCurrentIndexEngineVersion_Call struct {
*mock.Call
}
// GetCurrentIndexEngineVersion is a helper method to define mock.On call
func (_e *MockVersionManager_Expecter) GetCurrentIndexEngineVersion() *MockVersionManager_GetCurrentIndexEngineVersion_Call {
return &MockVersionManager_GetCurrentIndexEngineVersion_Call{Call: _e.mock.On("GetCurrentIndexEngineVersion")}
}
func (_c *MockVersionManager_GetCurrentIndexEngineVersion_Call) Run(run func()) *MockVersionManager_GetCurrentIndexEngineVersion_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockVersionManager_GetCurrentIndexEngineVersion_Call) Return(_a0 int32) *MockVersionManager_GetCurrentIndexEngineVersion_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockVersionManager_GetCurrentIndexEngineVersion_Call) RunAndReturn(run func() int32) *MockVersionManager_GetCurrentIndexEngineVersion_Call {
_c.Call.Return(run)
return _c
}
// GetMinimalIndexEngineVersion provides a mock function with given fields:
func (_m *MockVersionManager) GetMinimalIndexEngineVersion() int32 {
ret := _m.Called()
var r0 int32
if rf, ok := ret.Get(0).(func() int32); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int32)
}
return r0
}
// MockVersionManager_GetMinimalIndexEngineVersion_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetMinimalIndexEngineVersion'
type MockVersionManager_GetMinimalIndexEngineVersion_Call struct {
*mock.Call
}
// GetMinimalIndexEngineVersion is a helper method to define mock.On call
func (_e *MockVersionManager_Expecter) GetMinimalIndexEngineVersion() *MockVersionManager_GetMinimalIndexEngineVersion_Call {
return &MockVersionManager_GetMinimalIndexEngineVersion_Call{Call: _e.mock.On("GetMinimalIndexEngineVersion")}
}
func (_c *MockVersionManager_GetMinimalIndexEngineVersion_Call) Run(run func()) *MockVersionManager_GetMinimalIndexEngineVersion_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockVersionManager_GetMinimalIndexEngineVersion_Call) Return(_a0 int32) *MockVersionManager_GetMinimalIndexEngineVersion_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockVersionManager_GetMinimalIndexEngineVersion_Call) RunAndReturn(run func() int32) *MockVersionManager_GetMinimalIndexEngineVersion_Call {
_c.Call.Return(run)
return _c
}
// RemoveNode provides a mock function with given fields: session
func (_m *MockVersionManager) RemoveNode(session *sessionutil.Session) {
_m.Called(session)
}
// MockVersionManager_RemoveNode_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveNode'
type MockVersionManager_RemoveNode_Call struct {
*mock.Call
}
// RemoveNode is a helper method to define mock.On call
// - session *sessionutil.Session
func (_e *MockVersionManager_Expecter) RemoveNode(session interface{}) *MockVersionManager_RemoveNode_Call {
return &MockVersionManager_RemoveNode_Call{Call: _e.mock.On("RemoveNode", session)}
}
func (_c *MockVersionManager_RemoveNode_Call) Run(run func(session *sessionutil.Session)) *MockVersionManager_RemoveNode_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*sessionutil.Session))
})
return _c
}
func (_c *MockVersionManager_RemoveNode_Call) Return() *MockVersionManager_RemoveNode_Call {
_c.Call.Return()
return _c
}
func (_c *MockVersionManager_RemoveNode_Call) RunAndReturn(run func(*sessionutil.Session)) *MockVersionManager_RemoveNode_Call {
_c.Call.Return(run)
return _c
}
// Startup provides a mock function with given fields: sessions
func (_m *MockVersionManager) Startup(sessions map[string]*sessionutil.Session) {
_m.Called(sessions)
}
// MockVersionManager_Startup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Startup'
type MockVersionManager_Startup_Call struct {
*mock.Call
}
// Startup is a helper method to define mock.On call
// - sessions map[string]*sessionutil.Session
func (_e *MockVersionManager_Expecter) Startup(sessions interface{}) *MockVersionManager_Startup_Call {
return &MockVersionManager_Startup_Call{Call: _e.mock.On("Startup", sessions)}
}
func (_c *MockVersionManager_Startup_Call) Run(run func(sessions map[string]*sessionutil.Session)) *MockVersionManager_Startup_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(map[string]*sessionutil.Session))
})
return _c
}
func (_c *MockVersionManager_Startup_Call) Return() *MockVersionManager_Startup_Call {
_c.Call.Return()
return _c
}
func (_c *MockVersionManager_Startup_Call) RunAndReturn(run func(map[string]*sessionutil.Session)) *MockVersionManager_Startup_Call {
_c.Call.Return(run)
return _c
}
// Update provides a mock function with given fields: session
func (_m *MockVersionManager) Update(session *sessionutil.Session) {
_m.Called(session)
}
// MockVersionManager_Update_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Update'
type MockVersionManager_Update_Call struct {
*mock.Call
}
// Update is a helper method to define mock.On call
// - session *sessionutil.Session
func (_e *MockVersionManager_Expecter) Update(session interface{}) *MockVersionManager_Update_Call {
return &MockVersionManager_Update_Call{Call: _e.mock.On("Update", session)}
}
func (_c *MockVersionManager_Update_Call) Run(run func(session *sessionutil.Session)) *MockVersionManager_Update_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*sessionutil.Session))
})
return _c
}
func (_c *MockVersionManager_Update_Call) Return() *MockVersionManager_Update_Call {
_c.Call.Return()
return _c
}
func (_c *MockVersionManager_Update_Call) RunAndReturn(run func(*sessionutil.Session)) *MockVersionManager_Update_Call {
_c.Call.Return(run)
return _c
}
// NewMockVersionManager creates a new instance of MockVersionManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockVersionManager(t interface {
mock.TestingT
Cleanup(func())
}) *MockVersionManager {
mock := &MockVersionManager{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -149,7 +149,7 @@ type Server struct {
// segReferManager *SegmentReferenceManager
indexBuilder *indexBuilder
indexNodeManager *IndexNodeManager
indexEngineVersionManager *IndexEngineVersionManager
indexEngineVersionManager IndexEngineVersionManager
// manage ways that data coord access other coord
broker Broker
@ -454,7 +454,7 @@ func (s *Server) stopCompactionHandler() {
}
func (s *Server) createCompactionTrigger() {
s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.handler)
s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.handler, s.indexEngineVersionManager)
}
func (s *Server) stopCompactionTrigger() {

View File

@ -360,7 +360,7 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error {
buildIndexLatency := it.tr.RecordSpan()
metrics.IndexNodeKnowhereBuildIndexLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(buildIndexLatency.Seconds())
log.Ctx(ctx).Info("Successfully build index", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID), zap.Int64("SegmentID", it.segmentID))
log.Ctx(ctx).Info("Successfully build index", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID), zap.Int64("SegmentID", it.segmentID), zap.Int32("currentIndexVersion", it.currentIndexVersion))
return nil
}

View File

@ -4445,17 +4445,17 @@ func (_c *MockProxy_RenameCollection_Call) RunAndReturn(run func(context.Context
return _c
}
// ReplicateMessage provides a mock function with given fields: ctx, req
func (_m *MockProxy) ReplicateMessage(ctx context.Context, req *milvuspb.ReplicateMessageRequest) (*milvuspb.ReplicateMessageResponse, error) {
ret := _m.Called(ctx, req)
// ReplicateMessage provides a mock function with given fields: _a0, _a1
func (_m *MockProxy) ReplicateMessage(_a0 context.Context, _a1 *milvuspb.ReplicateMessageRequest) (*milvuspb.ReplicateMessageResponse, error) {
ret := _m.Called(_a0, _a1)
var r0 *milvuspb.ReplicateMessageResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.ReplicateMessageRequest) (*milvuspb.ReplicateMessageResponse, error)); ok {
return rf(ctx, req)
return rf(_a0, _a1)
}
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.ReplicateMessageRequest) *milvuspb.ReplicateMessageResponse); ok {
r0 = rf(ctx, req)
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*milvuspb.ReplicateMessageResponse)
@ -4463,7 +4463,7 @@ func (_m *MockProxy) ReplicateMessage(ctx context.Context, req *milvuspb.Replica
}
if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.ReplicateMessageRequest) error); ok {
r1 = rf(ctx, req)
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
@ -4477,13 +4477,13 @@ type MockProxy_ReplicateMessage_Call struct {
}
// ReplicateMessage is a helper method to define mock.On call
// - ctx context.Context
// - req *milvuspb.ReplicateMessageRequest
func (_e *MockProxy_Expecter) ReplicateMessage(ctx interface{}, req interface{}) *MockProxy_ReplicateMessage_Call {
return &MockProxy_ReplicateMessage_Call{Call: _e.mock.On("ReplicateMessage", ctx, req)}
// - _a0 context.Context
// - _a1 *milvuspb.ReplicateMessageRequest
func (_e *MockProxy_Expecter) ReplicateMessage(_a0 interface{}, _a1 interface{}) *MockProxy_ReplicateMessage_Call {
return &MockProxy_ReplicateMessage_Call{Call: _e.mock.On("ReplicateMessage", _a0, _a1)}
}
func (_c *MockProxy_ReplicateMessage_Call) Run(run func(ctx context.Context, req *milvuspb.ReplicateMessageRequest)) *MockProxy_ReplicateMessage_Call {
func (_c *MockProxy_ReplicateMessage_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.ReplicateMessageRequest)) *MockProxy_ReplicateMessage_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*milvuspb.ReplicateMessageRequest))
})