fix set dirty segment distribution to leader view (#26180)

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/26264/head
wei liu 2023-08-11 11:21:32 +08:00 committed by GitHub
parent 6e1f284485
commit b47a72bfcf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 427 additions and 100 deletions

View File

@ -348,6 +348,8 @@ generate-mockery: getdeps
# internal/querynodev2
$(PWD)/bin/mockery --name=Manager --dir=$(PWD)/internal/querynodev2/cluster --output=$(PWD)/internal/querynodev2/cluster --filename=mock_manager.go --with-expecter --outpkg=cluster --structname=MockManager --inpackage
$(PWD)/bin/mockery --name=Loader --dir=$(PWD)/internal/querynodev2/segments --output=$(PWD)/internal/querynodev2/segments --filename=mock_loader.go --with-expecter --outpkg=segments --structname=MockLoader --inpackage
$(PWD)/bin/mockery --name=SegmentManager --dir=$(PWD)/internal/querynodev2/segments --output=$(PWD)/internal/querynodev2/segments --filename=mock_segment_manager.go --with-expecter --outpkg=segments --structname=MockSegmentManager --inpackage
$(PWD)/bin/mockery --name=Segment --dir=$(PWD)/internal/querynodev2/segments --output=$(PWD)/internal/querynodev2/segments --filename=mock_segment.go --with-expecter --outpkg=segments --structname=MockSegment --inpackage
$(PWD)/bin/mockery --name=Worker --dir=$(PWD)/internal/querynodev2/cluster --output=$(PWD)/internal/querynodev2/cluster --filename=mock_worker.go --with-expecter --outpkg=worker --structname=MockWorker --inpackage
$(PWD)/bin/mockery --name=ShardDelegator --dir=$(PWD)/internal/querynodev2/delegator/ --output=$(PWD)/internal/querynodev2/delegator/ --filename=mock_delegator.go --with-expecter --outpkg=delegator --structname=MockShardDelegator --inpackage
# internal/datacoord

View File

@ -184,6 +184,12 @@ func (o *LeaderObserver) findNeedLoadedSegments(leaderView *meta.LeaderView, dis
}
loadInfo := utils.PackSegmentLoadInfo(resp, nil)
log.Debug("leader observer append a segment to set",
zap.Int64("collectionID", leaderView.CollectionID),
zap.String("channel", leaderView.Channel),
zap.Int64("leaderViewID", leaderView.ID),
zap.Int64("segmentID", s.GetID()),
zap.Int64("nodeID", s.Node))
ret = append(ret, &querypb.SyncAction{
Type: querypb.SyncType_Set,
PartitionID: s.GetPartitionID(),
@ -210,11 +216,12 @@ func (o *LeaderObserver) findNeedRemovedSegments(leaderView *meta.LeaderView, di
if ok || existInCurrentTarget || existInNextTarget {
continue
}
log.Debug("leader observer append a segment to remove:", zap.Int64("collectionID", leaderView.CollectionID),
zap.String("Channel", leaderView.Channel), zap.Int64("leaderViewID", leaderView.ID),
zap.Int64("segmentID", sid), zap.Bool("distMap_exist", ok),
zap.Bool("existInCurrentTarget", existInCurrentTarget),
zap.Bool("existInNextTarget", existInNextTarget))
log.Debug("leader observer append a segment to remove",
zap.Int64("collectionID", leaderView.CollectionID),
zap.String("channel", leaderView.Channel),
zap.Int64("leaderViewID", leaderView.ID),
zap.Int64("segmentID", sid),
zap.Int64("nodeID", s.NodeID))
ret = append(ret, &querypb.SyncAction{
Type: querypb.SyncType_Remove,
SegmentID: sid,

View File

@ -914,11 +914,13 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade
}
// Check whether QueryNodes are online and available
isAvailable := true
for _, version := range leader.Segments {
for id, version := range leader.Segments {
info := s.nodeMgr.Get(version.GetNodeID())
err = checkNodeAvailable(version.GetNodeID(), info)
if err != nil {
log.Info("leader is not available due to QueryNode unavailable", zap.Error(err))
log.Info("leader is not available due to QueryNode unavailable",
zap.Int64("segmentID", id),
zap.Error(err))
isAvailable = false
multierr.AppendInto(&channelErr, err)
break

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.16.0. DO NOT EDIT.
// Code generated by mockery v2.21.1. DO NOT EDIT.
package cluster
@ -22,6 +22,10 @@ func (_m *MockManager) GetWorker(nodeID int64) (Worker, error) {
ret := _m.Called(nodeID)
var r0 Worker
var r1 error
if rf, ok := ret.Get(0).(func(int64) (Worker, error)); ok {
return rf(nodeID)
}
if rf, ok := ret.Get(0).(func(int64) Worker); ok {
r0 = rf(nodeID)
} else {
@ -30,7 +34,6 @@ func (_m *MockManager) GetWorker(nodeID int64) (Worker, error) {
}
}
var r1 error
if rf, ok := ret.Get(1).(func(int64) error); ok {
r1 = rf(nodeID)
} else {
@ -46,7 +49,7 @@ type MockManager_GetWorker_Call struct {
}
// GetWorker is a helper method to define mock.On call
// - nodeID int64
// - nodeID int64
func (_e *MockManager_Expecter) GetWorker(nodeID interface{}) *MockManager_GetWorker_Call {
return &MockManager_GetWorker_Call{Call: _e.mock.On("GetWorker", nodeID)}
}
@ -63,6 +66,11 @@ func (_c *MockManager_GetWorker_Call) Return(_a0 Worker, _a1 error) *MockManager
return _c
}
func (_c *MockManager_GetWorker_Call) RunAndReturn(run func(int64) (Worker, error)) *MockManager_GetWorker_Call {
_c.Call.Return(run)
return _c
}
type mockConstructorTestingTNewMockManager interface {
mock.TestingT
Cleanup(func())

View File

@ -171,9 +171,21 @@ func (d *distribution) AddDistributions(entries ...SegmentEntry) {
defer d.mut.Unlock()
for _, entry := range entries {
if s, ok := d.sealedSegments[entry.SegmentID]; ok {
oldEntry, ok := d.sealedSegments[entry.SegmentID]
if ok && oldEntry.Version >= entry.Version {
log.Warn("Invalid segment distribution changed, skip it",
zap.Int64("segmentID", entry.SegmentID),
zap.Int64("oldVersion", oldEntry.Version),
zap.Int64("oldNode", oldEntry.NodeID),
zap.Int64("newVersion", entry.Version),
zap.Int64("newNode", entry.NodeID),
)
continue
}
if ok {
// remain the target version for already loaded segment to void skipping this segment when executing search
entry.TargetVersion = s.TargetVersion
entry.TargetVersion = oldEntry.TargetVersion
}
d.sealedSegments[entry.SegmentID] = entry
d.offlines.Remove(entry.SegmentID)

View File

@ -76,6 +76,31 @@ func (s *DistributionSuite) TestAddDistribution() {
},
expectedSignalClosed: true,
},
{
tag: "duplicate segment",
input: []SegmentEntry{
{
NodeID: 1,
SegmentID: 1,
},
{
NodeID: 1,
SegmentID: 1,
},
},
expected: []SnapshotItem{
{
NodeID: 1,
Segments: []SegmentEntry{
{
NodeID: 1,
SegmentID: 1,
},
},
},
},
expectedSignalClosed: true,
},
{
tag: "multiple_nodes",
input: []SegmentEntry{

View File

@ -126,7 +126,7 @@ func (node *QueryNode) loadIndex(ctx context.Context, req *querypb.LoadSegmentsR
continue
}
err := node.loader.LoadIndex(ctx, localSegment, info)
err := node.loader.LoadIndex(ctx, localSegment, info, req.Version)
if err != nil {
log.Warn("failed to load index", zap.Error(err))
status = merr.Status(err)

View File

@ -30,9 +30,11 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/eventlog"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/paramtable"
. "github.com/milvus-io/milvus/pkg/util/typeutil"
"go.uber.org/zap"
)
type SegmentFilter func(segment Segment) bool
@ -98,6 +100,8 @@ type SegmentManager interface {
Remove(segmentID UniqueID, scope querypb.DataScope) (int, int)
RemoveBy(filters ...SegmentFilter) (int, int)
Clear()
UpdateSegmentVersion(segmentType SegmentType, segmentID int64, newVersion int64)
}
var _ SegmentManager = (*segmentManager)(nil)
@ -132,31 +136,76 @@ func (mgr *segmentManager) Put(segmentType SegmentType, segments ...Segment) {
}
for _, segment := range segments {
if _, ok := targetMap[segment.ID()]; ok {
oldSegment, ok := targetMap[segment.ID()]
if ok && oldSegment.Version() >= segment.Version() {
log.Warn("Invalid segment distribution changed, skip it",
zap.Int64("segmentID", segment.ID()),
zap.Int64("oldVersion", oldSegment.Version()),
zap.Int64("newVersion", segment.Version()),
)
continue
}
eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("Segment %d[%d] loaded", segment.ID(), segment.Collection())))
targetMap[segment.ID()] = segment
metrics.QueryNodeNumSegments.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
fmt.Sprint(segment.Collection()),
fmt.Sprint(segment.Partition()),
segment.Type().String(),
fmt.Sprint(len(segment.Indexes())),
).Inc()
if segment.RowNum() > 0 {
metrics.QueryNodeNumEntities.WithLabelValues(
if !ok {
eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("Segment %d[%d] loaded", segment.ID(), segment.Collection())))
metrics.QueryNodeNumSegments.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
fmt.Sprint(segment.Collection()),
fmt.Sprint(segment.Partition()),
segment.Type().String(),
fmt.Sprint(len(segment.Indexes())),
).Add(float64(segment.RowNum()))
).Inc()
if segment.RowNum() > 0 {
metrics.QueryNodeNumEntities.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
fmt.Sprint(segment.Collection()),
fmt.Sprint(segment.Partition()),
segment.Type().String(),
fmt.Sprint(len(segment.Indexes())),
).Add(float64(segment.RowNum()))
}
}
}
mgr.updateMetric()
}
func (mgr *segmentManager) UpdateSegmentVersion(segmentType SegmentType, segmentID int64, newVersion int64) {
mgr.mu.Lock()
defer mgr.mu.Unlock()
targetMap := mgr.growingSegments
switch segmentType {
case SegmentTypeGrowing:
targetMap = mgr.growingSegments
case SegmentTypeSealed:
targetMap = mgr.sealedSegments
default:
panic("unexpected segment type")
}
segment, ok := targetMap[segmentID]
if !ok {
log.Warn("segment not exist, skip segment version change",
zap.Int64("segmentID", segmentID),
zap.Int64("newVersion", newVersion),
)
return
}
if segment.Version() >= newVersion {
log.Warn("Invalid segment version changed, skip it",
zap.Int64("segmentID", segment.ID()),
zap.Int64("oldVersion", segment.Version()),
zap.Int64("newVersion", newVersion))
return
}
segment.UpdateVersion(newVersion)
targetMap[segmentID] = segment
}
func (mgr *segmentManager) Get(segmentID UniqueID) Segment {
mgr.mu.RLock()
defer mgr.mu.RUnlock()

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.16.0. DO NOT EDIT.
// Code generated by mockery v2.21.1. DO NOT EDIT.
package segments
@ -41,6 +41,10 @@ func (_m *MockLoader) Load(ctx context.Context, collectionID int64, segmentType
ret := _m.Called(_ca...)
var r0 []Segment
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, int64, commonpb.SegmentState, int64, ...*querypb.SegmentLoadInfo) ([]Segment, error)); ok {
return rf(ctx, collectionID, segmentType, version, segments...)
}
if rf, ok := ret.Get(0).(func(context.Context, int64, commonpb.SegmentState, int64, ...*querypb.SegmentLoadInfo) []Segment); ok {
r0 = rf(ctx, collectionID, segmentType, version, segments...)
} else {
@ -49,7 +53,6 @@ func (_m *MockLoader) Load(ctx context.Context, collectionID int64, segmentType
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, int64, commonpb.SegmentState, int64, ...*querypb.SegmentLoadInfo) error); ok {
r1 = rf(ctx, collectionID, segmentType, version, segments...)
} else {
@ -65,11 +68,11 @@ type MockLoader_Load_Call struct {
}
// Load is a helper method to define mock.On call
// - ctx context.Context
// - collectionID int64
// - segmentType commonpb.SegmentState
// - version int64
// - segments ...*querypb.SegmentLoadInfo
// - ctx context.Context
// - collectionID int64
// - segmentType commonpb.SegmentState
// - version int64
// - segments ...*querypb.SegmentLoadInfo
func (_e *MockLoader_Expecter) Load(ctx interface{}, collectionID interface{}, segmentType interface{}, version interface{}, segments ...interface{}) *MockLoader_Load_Call {
return &MockLoader_Load_Call{Call: _e.mock.On("Load",
append([]interface{}{ctx, collectionID, segmentType, version}, segments...)...)}
@ -93,6 +96,11 @@ func (_c *MockLoader_Load_Call) Return(_a0 []Segment, _a1 error) *MockLoader_Loa
return _c
}
func (_c *MockLoader_Load_Call) RunAndReturn(run func(context.Context, int64, commonpb.SegmentState, int64, ...*querypb.SegmentLoadInfo) ([]Segment, error)) *MockLoader_Load_Call {
_c.Call.Return(run)
return _c
}
// LoadBloomFilterSet provides a mock function with given fields: ctx, collectionID, version, infos
func (_m *MockLoader) LoadBloomFilterSet(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) ([]*pkoracle.BloomFilterSet, error) {
_va := make([]interface{}, len(infos))
@ -105,6 +113,10 @@ func (_m *MockLoader) LoadBloomFilterSet(ctx context.Context, collectionID int64
ret := _m.Called(_ca...)
var r0 []*pkoracle.BloomFilterSet
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, int64, int64, ...*querypb.SegmentLoadInfo) ([]*pkoracle.BloomFilterSet, error)); ok {
return rf(ctx, collectionID, version, infos...)
}
if rf, ok := ret.Get(0).(func(context.Context, int64, int64, ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet); ok {
r0 = rf(ctx, collectionID, version, infos...)
} else {
@ -113,7 +125,6 @@ func (_m *MockLoader) LoadBloomFilterSet(ctx context.Context, collectionID int64
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, int64, int64, ...*querypb.SegmentLoadInfo) error); ok {
r1 = rf(ctx, collectionID, version, infos...)
} else {
@ -129,10 +140,10 @@ type MockLoader_LoadBloomFilterSet_Call struct {
}
// LoadBloomFilterSet is a helper method to define mock.On call
// - ctx context.Context
// - collectionID int64
// - version int64
// - infos ...*querypb.SegmentLoadInfo
// - ctx context.Context
// - collectionID int64
// - version int64
// - infos ...*querypb.SegmentLoadInfo
func (_e *MockLoader_Expecter) LoadBloomFilterSet(ctx interface{}, collectionID interface{}, version interface{}, infos ...interface{}) *MockLoader_LoadBloomFilterSet_Call {
return &MockLoader_LoadBloomFilterSet_Call{Call: _e.mock.On("LoadBloomFilterSet",
append([]interface{}{ctx, collectionID, version}, infos...)...)}
@ -156,6 +167,11 @@ func (_c *MockLoader_LoadBloomFilterSet_Call) Return(_a0 []*pkoracle.BloomFilter
return _c
}
func (_c *MockLoader_LoadBloomFilterSet_Call) RunAndReturn(run func(context.Context, int64, int64, ...*querypb.SegmentLoadInfo) ([]*pkoracle.BloomFilterSet, error)) *MockLoader_LoadBloomFilterSet_Call {
_c.Call.Return(run)
return _c
}
// LoadDeltaLogs provides a mock function with given fields: ctx, segment, deltaLogs
func (_m *MockLoader) LoadDeltaLogs(ctx context.Context, segment *LocalSegment, deltaLogs []*datapb.FieldBinlog) error {
ret := _m.Called(ctx, segment, deltaLogs)
@ -176,9 +192,9 @@ type MockLoader_LoadDeltaLogs_Call struct {
}
// LoadDeltaLogs is a helper method to define mock.On call
// - ctx context.Context
// - segment *LocalSegment
// - deltaLogs []*datapb.FieldBinlog
// - ctx context.Context
// - segment *LocalSegment
// - deltaLogs []*datapb.FieldBinlog
func (_e *MockLoader_Expecter) LoadDeltaLogs(ctx interface{}, segment interface{}, deltaLogs interface{}) *MockLoader_LoadDeltaLogs_Call {
return &MockLoader_LoadDeltaLogs_Call{Call: _e.mock.On("LoadDeltaLogs", ctx, segment, deltaLogs)}
}
@ -195,13 +211,18 @@ func (_c *MockLoader_LoadDeltaLogs_Call) Return(_a0 error) *MockLoader_LoadDelta
return _c
}
// LoadIndex provides a mock function with given fields: ctx, segment, info
func (_m *MockLoader) LoadIndex(ctx context.Context, segment *LocalSegment, info *querypb.SegmentLoadInfo) error {
ret := _m.Called(ctx, segment, info)
func (_c *MockLoader_LoadDeltaLogs_Call) RunAndReturn(run func(context.Context, *LocalSegment, []*datapb.FieldBinlog) error) *MockLoader_LoadDeltaLogs_Call {
_c.Call.Return(run)
return _c
}
// LoadIndex provides a mock function with given fields: ctx, segment, info, version
func (_m *MockLoader) LoadIndex(ctx context.Context, segment *LocalSegment, info *querypb.SegmentLoadInfo, version int64) error {
ret := _m.Called(ctx, segment, info, version)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *LocalSegment, *querypb.SegmentLoadInfo) error); ok {
r0 = rf(ctx, segment, info)
if rf, ok := ret.Get(0).(func(context.Context, *LocalSegment, *querypb.SegmentLoadInfo, int64) error); ok {
r0 = rf(ctx, segment, info, version)
} else {
r0 = ret.Error(0)
}
@ -215,16 +236,17 @@ type MockLoader_LoadIndex_Call struct {
}
// LoadIndex is a helper method to define mock.On call
// - ctx context.Context
// - segment *LocalSegment
// - info *querypb.SegmentLoadInfo
func (_e *MockLoader_Expecter) LoadIndex(ctx interface{}, segment interface{}, info interface{}) *MockLoader_LoadIndex_Call {
return &MockLoader_LoadIndex_Call{Call: _e.mock.On("LoadIndex", ctx, segment, info)}
// - ctx context.Context
// - segment *LocalSegment
// - info *querypb.SegmentLoadInfo
// - version int64
func (_e *MockLoader_Expecter) LoadIndex(ctx interface{}, segment interface{}, info interface{}, version interface{}) *MockLoader_LoadIndex_Call {
return &MockLoader_LoadIndex_Call{Call: _e.mock.On("LoadIndex", ctx, segment, info, version)}
}
func (_c *MockLoader_LoadIndex_Call) Run(run func(ctx context.Context, segment *LocalSegment, info *querypb.SegmentLoadInfo)) *MockLoader_LoadIndex_Call {
func (_c *MockLoader_LoadIndex_Call) Run(run func(ctx context.Context, segment *LocalSegment, info *querypb.SegmentLoadInfo, version int64)) *MockLoader_LoadIndex_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*LocalSegment), args[2].(*querypb.SegmentLoadInfo))
run(args[0].(context.Context), args[1].(*LocalSegment), args[2].(*querypb.SegmentLoadInfo), args[3].(int64))
})
return _c
}
@ -234,6 +256,11 @@ func (_c *MockLoader_LoadIndex_Call) Return(_a0 error) *MockLoader_LoadIndex_Cal
return _c
}
func (_c *MockLoader_LoadIndex_Call) RunAndReturn(run func(context.Context, *LocalSegment, *querypb.SegmentLoadInfo, int64) error) *MockLoader_LoadIndex_Call {
_c.Call.Return(run)
return _c
}
type mockConstructorTestingTNewMockLoader interface {
mock.TestingT
Cleanup(func())

View File

@ -1,13 +1,13 @@
// Code generated by mockery v2.16.0. DO NOT EDIT.
// Code generated by mockery v2.21.1. DO NOT EDIT.
package segments
import (
commonpb "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
mock "github.com/stretchr/testify/mock"
msgpb "github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
segcorepb "github.com/milvus-io/milvus/internal/proto/segcorepb"
storage "github.com/milvus-io/milvus/internal/storage"
@ -55,6 +55,11 @@ func (_c *MockSegment_AddIndex_Call) Return() *MockSegment_AddIndex_Call {
return _c
}
func (_c *MockSegment_AddIndex_Call) RunAndReturn(run func(int64, *IndexedFieldInfo)) *MockSegment_AddIndex_Call {
_c.Call.Return(run)
return _c
}
// Collection provides a mock function with given fields:
func (_m *MockSegment) Collection() int64 {
ret := _m.Called()
@ -91,6 +96,11 @@ func (_c *MockSegment_Collection_Call) Return(_a0 int64) *MockSegment_Collection
return _c
}
func (_c *MockSegment_Collection_Call) RunAndReturn(run func() int64) *MockSegment_Collection_Call {
_c.Call.Return(run)
return _c
}
// Delete provides a mock function with given fields: primaryKeys, timestamps
func (_m *MockSegment) Delete(primaryKeys []storage.PrimaryKey, timestamps []uint64) error {
ret := _m.Called(primaryKeys, timestamps)
@ -129,6 +139,11 @@ func (_c *MockSegment_Delete_Call) Return(_a0 error) *MockSegment_Delete_Call {
return _c
}
func (_c *MockSegment_Delete_Call) RunAndReturn(run func([]storage.PrimaryKey, []uint64) error) *MockSegment_Delete_Call {
_c.Call.Return(run)
return _c
}
// ExistIndex provides a mock function with given fields: fieldID
func (_m *MockSegment) ExistIndex(fieldID int64) bool {
ret := _m.Called(fieldID)
@ -166,6 +181,11 @@ func (_c *MockSegment_ExistIndex_Call) Return(_a0 bool) *MockSegment_ExistIndex_
return _c
}
func (_c *MockSegment_ExistIndex_Call) RunAndReturn(run func(int64) bool) *MockSegment_ExistIndex_Call {
_c.Call.Return(run)
return _c
}
// GetIndex provides a mock function with given fields: fieldID
func (_m *MockSegment) GetIndex(fieldID int64) *IndexedFieldInfo {
ret := _m.Called(fieldID)
@ -205,6 +225,11 @@ func (_c *MockSegment_GetIndex_Call) Return(_a0 *IndexedFieldInfo) *MockSegment_
return _c
}
func (_c *MockSegment_GetIndex_Call) RunAndReturn(run func(int64) *IndexedFieldInfo) *MockSegment_GetIndex_Call {
_c.Call.Return(run)
return _c
}
// ID provides a mock function with given fields:
func (_m *MockSegment) ID() int64 {
ret := _m.Called()
@ -241,6 +266,11 @@ func (_c *MockSegment_ID_Call) Return(_a0 int64) *MockSegment_ID_Call {
return _c
}
func (_c *MockSegment_ID_Call) RunAndReturn(run func() int64) *MockSegment_ID_Call {
_c.Call.Return(run)
return _c
}
// Indexes provides a mock function with given fields:
func (_m *MockSegment) Indexes() []*IndexedFieldInfo {
ret := _m.Called()
@ -279,6 +309,11 @@ func (_c *MockSegment_Indexes_Call) Return(_a0 []*IndexedFieldInfo) *MockSegment
return _c
}
func (_c *MockSegment_Indexes_Call) RunAndReturn(run func() []*IndexedFieldInfo) *MockSegment_Indexes_Call {
_c.Call.Return(run)
return _c
}
// Insert provides a mock function with given fields: rowIDs, timestamps, record
func (_m *MockSegment) Insert(rowIDs []int64, timestamps []uint64, record *segcorepb.InsertRecord) error {
ret := _m.Called(rowIDs, timestamps, record)
@ -318,6 +353,11 @@ func (_c *MockSegment_Insert_Call) Return(_a0 error) *MockSegment_Insert_Call {
return _c
}
func (_c *MockSegment_Insert_Call) RunAndReturn(run func([]int64, []uint64, *segcorepb.InsertRecord) error) *MockSegment_Insert_Call {
_c.Call.Return(run)
return _c
}
// InsertCount provides a mock function with given fields:
func (_m *MockSegment) InsertCount() int64 {
ret := _m.Called()
@ -354,6 +394,11 @@ func (_c *MockSegment_InsertCount_Call) Return(_a0 int64) *MockSegment_InsertCou
return _c
}
func (_c *MockSegment_InsertCount_Call) RunAndReturn(run func() int64) *MockSegment_InsertCount_Call {
_c.Call.Return(run)
return _c
}
// LastDeltaTimestamp provides a mock function with given fields:
func (_m *MockSegment) LastDeltaTimestamp() uint64 {
ret := _m.Called()
@ -390,6 +435,11 @@ func (_c *MockSegment_LastDeltaTimestamp_Call) Return(_a0 uint64) *MockSegment_L
return _c
}
func (_c *MockSegment_LastDeltaTimestamp_Call) RunAndReturn(run func() uint64) *MockSegment_LastDeltaTimestamp_Call {
_c.Call.Return(run)
return _c
}
// MayPkExist provides a mock function with given fields: pk
func (_m *MockSegment) MayPkExist(pk storage.PrimaryKey) bool {
ret := _m.Called(pk)
@ -427,6 +477,11 @@ func (_c *MockSegment_MayPkExist_Call) Return(_a0 bool) *MockSegment_MayPkExist_
return _c
}
func (_c *MockSegment_MayPkExist_Call) RunAndReturn(run func(storage.PrimaryKey) bool) *MockSegment_MayPkExist_Call {
_c.Call.Return(run)
return _c
}
// MemSize provides a mock function with given fields:
func (_m *MockSegment) MemSize() int64 {
ret := _m.Called()
@ -463,6 +518,11 @@ func (_c *MockSegment_MemSize_Call) Return(_a0 int64) *MockSegment_MemSize_Call
return _c
}
func (_c *MockSegment_MemSize_Call) RunAndReturn(run func() int64) *MockSegment_MemSize_Call {
_c.Call.Return(run)
return _c
}
// Partition provides a mock function with given fields:
func (_m *MockSegment) Partition() int64 {
ret := _m.Called()
@ -499,6 +559,11 @@ func (_c *MockSegment_Partition_Call) Return(_a0 int64) *MockSegment_Partition_C
return _c
}
func (_c *MockSegment_Partition_Call) RunAndReturn(run func() int64) *MockSegment_Partition_Call {
_c.Call.Return(run)
return _c
}
// RowNum provides a mock function with given fields:
func (_m *MockSegment) RowNum() int64 {
ret := _m.Called()
@ -535,6 +600,11 @@ func (_c *MockSegment_RowNum_Call) Return(_a0 int64) *MockSegment_RowNum_Call {
return _c
}
func (_c *MockSegment_RowNum_Call) RunAndReturn(run func() int64) *MockSegment_RowNum_Call {
_c.Call.Return(run)
return _c
}
// Shard provides a mock function with given fields:
func (_m *MockSegment) Shard() string {
ret := _m.Called()
@ -571,6 +641,11 @@ func (_c *MockSegment_Shard_Call) Return(_a0 string) *MockSegment_Shard_Call {
return _c
}
func (_c *MockSegment_Shard_Call) RunAndReturn(run func() string) *MockSegment_Shard_Call {
_c.Call.Return(run)
return _c
}
// StartPosition provides a mock function with given fields:
func (_m *MockSegment) StartPosition() *msgpb.MsgPosition {
ret := _m.Called()
@ -609,6 +684,11 @@ func (_c *MockSegment_StartPosition_Call) Return(_a0 *msgpb.MsgPosition) *MockSe
return _c
}
func (_c *MockSegment_StartPosition_Call) RunAndReturn(run func() *msgpb.MsgPosition) *MockSegment_StartPosition_Call {
_c.Call.Return(run)
return _c
}
// Type provides a mock function with given fields:
func (_m *MockSegment) Type() commonpb.SegmentState {
ret := _m.Called()
@ -645,6 +725,11 @@ func (_c *MockSegment_Type_Call) Return(_a0 commonpb.SegmentState) *MockSegment_
return _c
}
func (_c *MockSegment_Type_Call) RunAndReturn(run func() commonpb.SegmentState) *MockSegment_Type_Call {
_c.Call.Return(run)
return _c
}
// UpdateBloomFilter provides a mock function with given fields: pks
func (_m *MockSegment) UpdateBloomFilter(pks []storage.PrimaryKey) {
_m.Called(pks)
@ -673,6 +758,44 @@ func (_c *MockSegment_UpdateBloomFilter_Call) Return() *MockSegment_UpdateBloomF
return _c
}
func (_c *MockSegment_UpdateBloomFilter_Call) RunAndReturn(run func([]storage.PrimaryKey)) *MockSegment_UpdateBloomFilter_Call {
_c.Call.Return(run)
return _c
}
// UpdateVersion provides a mock function with given fields: version
func (_m *MockSegment) UpdateVersion(version int64) {
_m.Called(version)
}
// MockSegment_UpdateVersion_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateVersion'
type MockSegment_UpdateVersion_Call struct {
*mock.Call
}
// UpdateVersion is a helper method to define mock.On call
// - version int64
func (_e *MockSegment_Expecter) UpdateVersion(version interface{}) *MockSegment_UpdateVersion_Call {
return &MockSegment_UpdateVersion_Call{Call: _e.mock.On("UpdateVersion", version)}
}
func (_c *MockSegment_UpdateVersion_Call) Run(run func(version int64)) *MockSegment_UpdateVersion_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockSegment_UpdateVersion_Call) Return() *MockSegment_UpdateVersion_Call {
_c.Call.Return()
return _c
}
func (_c *MockSegment_UpdateVersion_Call) RunAndReturn(run func(int64)) *MockSegment_UpdateVersion_Call {
_c.Call.Return(run)
return _c
}
// Version provides a mock function with given fields:
func (_m *MockSegment) Version() int64 {
ret := _m.Called()
@ -709,6 +832,11 @@ func (_c *MockSegment_Version_Call) Return(_a0 int64) *MockSegment_Version_Call
return _c
}
func (_c *MockSegment_Version_Call) RunAndReturn(run func() int64) *MockSegment_Version_Call {
_c.Call.Return(run)
return _c
}
type mockConstructorTestingTNewMockSegment interface {
mock.TestingT
Cleanup(func())

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.30.16. DO NOT EDIT.
// Code generated by mockery v2.21.1. DO NOT EDIT.
package segments
@ -495,12 +495,48 @@ func (_c *MockSegmentManager_RemoveBy_Call) RunAndReturn(run func(...SegmentFilt
return _c
}
// NewMockSegmentManager creates a new instance of MockSegmentManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockSegmentManager(t interface {
// UpdateSegmentVersion provides a mock function with given fields: segmentType, segmentID, newVersion
func (_m *MockSegmentManager) UpdateSegmentVersion(segmentType commonpb.SegmentState, segmentID int64, newVersion int64) {
_m.Called(segmentType, segmentID, newVersion)
}
// MockSegmentManager_UpdateSegmentVersion_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateSegmentVersion'
type MockSegmentManager_UpdateSegmentVersion_Call struct {
*mock.Call
}
// UpdateSegmentVersion is a helper method to define mock.On call
// - segmentType commonpb.SegmentState
// - segmentID int64
// - newVersion int64
func (_e *MockSegmentManager_Expecter) UpdateSegmentVersion(segmentType interface{}, segmentID interface{}, newVersion interface{}) *MockSegmentManager_UpdateSegmentVersion_Call {
return &MockSegmentManager_UpdateSegmentVersion_Call{Call: _e.mock.On("UpdateSegmentVersion", segmentType, segmentID, newVersion)}
}
func (_c *MockSegmentManager_UpdateSegmentVersion_Call) Run(run func(segmentType commonpb.SegmentState, segmentID int64, newVersion int64)) *MockSegmentManager_UpdateSegmentVersion_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(commonpb.SegmentState), args[1].(int64), args[2].(int64))
})
return _c
}
func (_c *MockSegmentManager_UpdateSegmentVersion_Call) Return() *MockSegmentManager_UpdateSegmentVersion_Call {
_c.Call.Return()
return _c
}
func (_c *MockSegmentManager_UpdateSegmentVersion_Call) RunAndReturn(run func(commonpb.SegmentState, int64, int64)) *MockSegmentManager_UpdateSegmentVersion_Call {
_c.Call.Return(run)
return _c
}
type mockConstructorTestingTNewMockSegmentManager interface {
mock.TestingT
Cleanup(func())
}) *MockSegmentManager {
}
// NewMockSegmentManager creates a new instance of MockSegmentManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockSegmentManager(t mockConstructorTestingTNewMockSegmentManager) *MockSegmentManager {
mock := &MockSegmentManager{}
mock.Mock.Test(t)

View File

@ -73,39 +73,6 @@ type IndexedFieldInfo struct {
IndexInfo *querypb.FieldIndexInfo
}
type Segment interface {
// Properties
ID() int64
Collection() int64
Partition() int64
Shard() string
Version() int64
StartPosition() *msgpb.MsgPosition
Type() SegmentType
// Stats related
// InsertCount returns the number of inserted rows, not effected by deletion
InsertCount() int64
// RowNum returns the number of rows, it's slow, so DO NOT call it in a loop
RowNum() int64
MemSize() int64
// Index related
AddIndex(fieldID int64, index *IndexedFieldInfo)
GetIndex(fieldID int64) *IndexedFieldInfo
ExistIndex(fieldID int64) bool
Indexes() []*IndexedFieldInfo
// Modification related
Insert(rowIDs []int64, timestamps []typeutil.Timestamp, record *segcorepb.InsertRecord) error
Delete(primaryKeys []storage.PrimaryKey, timestamps []typeutil.Timestamp) error
LastDeltaTimestamp() uint64
// Bloom filter related
UpdateBloomFilter(pks []storage.PrimaryKey)
MayPkExist(pk storage.PrimaryKey) bool
}
type baseSegment struct {
segmentID int64
partitionID int64
@ -159,6 +126,10 @@ func (s *baseSegment) Version() int64 {
return s.version
}
func (s *baseSegment) UpdateVersion(version int64) {
s.version = version
}
func (s *baseSegment) UpdateBloomFilter(pks []storage.PrimaryKey) {
s.bloomFilterSet.UpdateBloomFilter(pks)
}

View File

@ -0,0 +1,58 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package segments
import (
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/proto/segcorepb"
storage "github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type Segment interface {
// Properties
ID() int64
Collection() int64
Partition() int64
Shard() string
Version() int64
StartPosition() *msgpb.MsgPosition
Type() SegmentType
// Stats related
// InsertCount returns the number of inserted rows, not effected by deletion
InsertCount() int64
// RowNum returns the number of rows, it's slow, so DO NOT call it in a loop
RowNum() int64
MemSize() int64
// Index related
AddIndex(fieldID int64, index *IndexedFieldInfo)
GetIndex(fieldID int64) *IndexedFieldInfo
ExistIndex(fieldID int64) bool
Indexes() []*IndexedFieldInfo
// Modification related
Insert(rowIDs []int64, timestamps []typeutil.Timestamp, record *segcorepb.InsertRecord) error
Delete(primaryKeys []storage.PrimaryKey, timestamps []typeutil.Timestamp) error
LastDeltaTimestamp() uint64
// Bloom filter related
UpdateBloomFilter(pks []storage.PrimaryKey)
MayPkExist(pk storage.PrimaryKey) bool
UpdateVersion(version int64)
}

View File

@ -68,7 +68,7 @@ type Loader interface {
LoadBloomFilterSet(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) ([]*pkoracle.BloomFilterSet, error)
// LoadIndex append index for segment and remove vector binlogs.
LoadIndex(ctx context.Context, segment *LocalSegment, info *querypb.SegmentLoadInfo) error
LoadIndex(ctx context.Context, segment *LocalSegment, info *querypb.SegmentLoadInfo, version int64) error
}
type LoadResource struct {
@ -148,7 +148,7 @@ func (loader *segmentLoader) Load(ctx context.Context,
return nil, nil
}
// Filter out loaded & loading segments
infos := loader.prepare(segmentType, segments...)
infos := loader.prepare(segmentType, version, segments...)
defer loader.unregister(infos...)
// continue to wait other task done
@ -246,7 +246,7 @@ func (loader *segmentLoader) Load(ctx context.Context,
return loaded, nil
}
func (loader *segmentLoader) prepare(segmentType SegmentType, segments ...*querypb.SegmentLoadInfo) []*querypb.SegmentLoadInfo {
func (loader *segmentLoader) prepare(segmentType SegmentType, version int64, segments ...*querypb.SegmentLoadInfo) []*querypb.SegmentLoadInfo {
loader.mut.Lock()
defer loader.mut.Unlock()
@ -259,6 +259,8 @@ func (loader *segmentLoader) prepare(segmentType SegmentType, segments ...*query
infos = append(infos, segment)
loader.loadingSegments.Insert(segment.GetSegmentID(), make(chan struct{}))
} else {
// try to update segment version before skip load operation
loader.manager.Segment.UpdateSegmentVersion(segmentType, segment.SegmentID, version)
log.Info("skip loaded/loading segment", zap.Int64("segmentID", segment.GetSegmentID()),
zap.Bool("isLoaded", len(loader.manager.Segment.GetBy(WithType(segmentType), WithID(segment.GetSegmentID()))) > 0),
zap.Bool("isLoading", loader.loadingSegments.Contain(segment.GetSegmentID())),
@ -934,7 +936,7 @@ func (loader *segmentLoader) getFieldType(collectionID, fieldID int64) (schemapb
return 0, merr.WrapErrFieldNotFound(fieldID)
}
func (loader *segmentLoader) LoadIndex(ctx context.Context, segment *LocalSegment, loadInfo *querypb.SegmentLoadInfo) error {
func (loader *segmentLoader) LoadIndex(ctx context.Context, segment *LocalSegment, loadInfo *querypb.SegmentLoadInfo, version int64) error {
log := log.Ctx(ctx).With(
zap.Int64("collection", segment.Collection()),
zap.Int64("segment", segment.ID()),
@ -942,7 +944,7 @@ func (loader *segmentLoader) LoadIndex(ctx context.Context, segment *LocalSegmen
// Filter out LOADING segments only
// use None to avoid loaded check
infos := loader.prepare(commonpb.SegmentState_SegmentStateNone, loadInfo)
infos := loader.prepare(commonpb.SegmentState_SegmentStateNone, version, loadInfo)
defer loader.unregister(infos...)
indexInfo := lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *querypb.SegmentLoadInfo {

View File

@ -367,7 +367,7 @@ func (suite *SegmentLoaderSuite) TestLoadIndex() {
},
}
err := suite.loader.LoadIndex(ctx, segment, loadInfo)
err := suite.loader.LoadIndex(ctx, segment, loadInfo, 0)
suite.ErrorIs(err, merr.ErrIndexNotFound)
}

View File

@ -721,7 +721,7 @@ func (suite *ServiceSuite) TestLoadIndex_Failed() {
suite.node.loader = loader
}()
mockLoader.EXPECT().LoadIndex(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("mocked error"))
mockLoader.EXPECT().LoadIndex(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("mocked error"))
infos := suite.genSegmentLoadInfos(schema)
req := &querypb.LoadSegmentsRequest{