enhance: Avoid use concrete segment type in segments interfaces (#34521)

See also #34519

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/34558/head
congqixia 2024-07-10 10:18:12 +08:00 committed by GitHub
parent 3c7046353a
commit d60e628aed
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 97 additions and 78 deletions

View File

@ -204,7 +204,7 @@ func NewManager() *Manager {
return nil, merr.WrapErrCollectionNotLoaded(segment.Collection(), "failed to load segment fields")
}
err = manager.Loader.LoadLazySegment(ctx, segment.(*LocalSegment), info)
err = manager.Loader.LoadLazySegment(ctx, segment, info)
return nil, err
})
if err != nil {

View File

@ -217,11 +217,11 @@ func (_c *MockLoader_LoadDeltaLogs_Call) RunAndReturn(run func(context.Context,
}
// LoadIndex provides a mock function with given fields: ctx, segment, info, version
func (_m *MockLoader) LoadIndex(ctx context.Context, segment *LocalSegment, info *querypb.SegmentLoadInfo, version int64) error {
func (_m *MockLoader) LoadIndex(ctx context.Context, segment Segment, info *querypb.SegmentLoadInfo, version int64) error {
ret := _m.Called(ctx, segment, info, version)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *LocalSegment, *querypb.SegmentLoadInfo, int64) error); ok {
if rf, ok := ret.Get(0).(func(context.Context, Segment, *querypb.SegmentLoadInfo, int64) error); ok {
r0 = rf(ctx, segment, info, version)
} else {
r0 = ret.Error(0)
@ -237,16 +237,16 @@ type MockLoader_LoadIndex_Call struct {
// LoadIndex is a helper method to define mock.On call
// - ctx context.Context
// - segment *LocalSegment
// - segment Segment
// - info *querypb.SegmentLoadInfo
// - version int64
func (_e *MockLoader_Expecter) LoadIndex(ctx interface{}, segment interface{}, info interface{}, version interface{}) *MockLoader_LoadIndex_Call {
return &MockLoader_LoadIndex_Call{Call: _e.mock.On("LoadIndex", ctx, segment, info, version)}
}
func (_c *MockLoader_LoadIndex_Call) Run(run func(ctx context.Context, segment *LocalSegment, info *querypb.SegmentLoadInfo, version int64)) *MockLoader_LoadIndex_Call {
func (_c *MockLoader_LoadIndex_Call) Run(run func(ctx context.Context, segment Segment, info *querypb.SegmentLoadInfo, version int64)) *MockLoader_LoadIndex_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*LocalSegment), args[2].(*querypb.SegmentLoadInfo), args[3].(int64))
run(args[0].(context.Context), args[1].(Segment), args[2].(*querypb.SegmentLoadInfo), args[3].(int64))
})
return _c
}
@ -256,17 +256,17 @@ func (_c *MockLoader_LoadIndex_Call) Return(_a0 error) *MockLoader_LoadIndex_Cal
return _c
}
func (_c *MockLoader_LoadIndex_Call) RunAndReturn(run func(context.Context, *LocalSegment, *querypb.SegmentLoadInfo, int64) error) *MockLoader_LoadIndex_Call {
func (_c *MockLoader_LoadIndex_Call) RunAndReturn(run func(context.Context, Segment, *querypb.SegmentLoadInfo, int64) error) *MockLoader_LoadIndex_Call {
_c.Call.Return(run)
return _c
}
// LoadLazySegment provides a mock function with given fields: ctx, segment, loadInfo
func (_m *MockLoader) LoadLazySegment(ctx context.Context, segment *LocalSegment, loadInfo *querypb.SegmentLoadInfo) error {
func (_m *MockLoader) LoadLazySegment(ctx context.Context, segment Segment, loadInfo *querypb.SegmentLoadInfo) error {
ret := _m.Called(ctx, segment, loadInfo)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *LocalSegment, *querypb.SegmentLoadInfo) error); ok {
if rf, ok := ret.Get(0).(func(context.Context, Segment, *querypb.SegmentLoadInfo) error); ok {
r0 = rf(ctx, segment, loadInfo)
} else {
r0 = ret.Error(0)
@ -282,15 +282,15 @@ type MockLoader_LoadLazySegment_Call struct {
// LoadLazySegment is a helper method to define mock.On call
// - ctx context.Context
// - segment *LocalSegment
// - segment Segment
// - loadInfo *querypb.SegmentLoadInfo
func (_e *MockLoader_Expecter) LoadLazySegment(ctx interface{}, segment interface{}, loadInfo interface{}) *MockLoader_LoadLazySegment_Call {
return &MockLoader_LoadLazySegment_Call{Call: _e.mock.On("LoadLazySegment", ctx, segment, loadInfo)}
}
func (_c *MockLoader_LoadLazySegment_Call) Run(run func(ctx context.Context, segment *LocalSegment, loadInfo *querypb.SegmentLoadInfo)) *MockLoader_LoadLazySegment_Call {
func (_c *MockLoader_LoadLazySegment_Call) Run(run func(ctx context.Context, segment Segment, loadInfo *querypb.SegmentLoadInfo)) *MockLoader_LoadLazySegment_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*LocalSegment), args[2].(*querypb.SegmentLoadInfo))
run(args[0].(context.Context), args[1].(Segment), args[2].(*querypb.SegmentLoadInfo))
})
return _c
}
@ -300,51 +300,7 @@ func (_c *MockLoader_LoadLazySegment_Call) Return(_a0 error) *MockLoader_LoadLaz
return _c
}
func (_c *MockLoader_LoadLazySegment_Call) RunAndReturn(run func(context.Context, *LocalSegment, *querypb.SegmentLoadInfo) error) *MockLoader_LoadLazySegment_Call {
_c.Call.Return(run)
return _c
}
// LoadSegment provides a mock function with given fields: ctx, segment, loadInfo
func (_m *MockLoader) LoadSegment(ctx context.Context, segment *LocalSegment, loadInfo *querypb.SegmentLoadInfo) error {
ret := _m.Called(ctx, segment, loadInfo)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *LocalSegment, *querypb.SegmentLoadInfo) error); ok {
r0 = rf(ctx, segment, loadInfo)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockLoader_LoadSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LoadSegment'
type MockLoader_LoadSegment_Call struct {
*mock.Call
}
// LoadSegment is a helper method to define mock.On call
// - ctx context.Context
// - segment *LocalSegment
// - loadInfo *querypb.SegmentLoadInfo
func (_e *MockLoader_Expecter) LoadSegment(ctx interface{}, segment interface{}, loadInfo interface{}) *MockLoader_LoadSegment_Call {
return &MockLoader_LoadSegment_Call{Call: _e.mock.On("LoadSegment", ctx, segment, loadInfo)}
}
func (_c *MockLoader_LoadSegment_Call) Run(run func(ctx context.Context, segment *LocalSegment, loadInfo *querypb.SegmentLoadInfo)) *MockLoader_LoadSegment_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*LocalSegment), args[2].(*querypb.SegmentLoadInfo))
})
return _c
}
func (_c *MockLoader_LoadSegment_Call) Return(_a0 error) *MockLoader_LoadSegment_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockLoader_LoadSegment_Call) RunAndReturn(run func(context.Context, *LocalSegment, *querypb.SegmentLoadInfo) error) *MockLoader_LoadSegment_Call {
func (_c *MockLoader_LoadLazySegment_Call) RunAndReturn(run func(context.Context, Segment, *querypb.SegmentLoadInfo) error) *MockLoader_LoadLazySegment_Call {
_c.Call.Return(run)
return _c
}

View File

@ -17,6 +17,8 @@ import (
querypb "github.com/milvus-io/milvus/internal/proto/querypb"
schemapb "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
segcorepb "github.com/milvus-io/milvus/internal/proto/segcorepb"
storage "github.com/milvus-io/milvus/internal/storage"
@ -51,30 +53,30 @@ func (_m *MockSegment) BatchPkExist(lc *storage.BatchLocationsCache) []bool {
return r0
}
// MockSegment_BatchTestLocationCache_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'BatchPkExist'
type MockSegment_BatchTestLocationCache_Call struct {
// MockSegment_BatchPkExist_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'BatchPkExist'
type MockSegment_BatchPkExist_Call struct {
*mock.Call
}
// BatchPkExist is a helper method to define mock.On call
// - lc *storage.BatchLocationsCache
func (_e *MockSegment_Expecter) BatchPkExist(lc interface{}) *MockSegment_BatchTestLocationCache_Call {
return &MockSegment_BatchTestLocationCache_Call{Call: _e.mock.On("BatchPkExist", lc)}
func (_e *MockSegment_Expecter) BatchPkExist(lc interface{}) *MockSegment_BatchPkExist_Call {
return &MockSegment_BatchPkExist_Call{Call: _e.mock.On("BatchPkExist", lc)}
}
func (_c *MockSegment_BatchTestLocationCache_Call) Run(run func(lc *storage.BatchLocationsCache)) *MockSegment_BatchTestLocationCache_Call {
func (_c *MockSegment_BatchPkExist_Call) Run(run func(lc *storage.BatchLocationsCache)) *MockSegment_BatchPkExist_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*storage.BatchLocationsCache))
})
return _c
}
func (_c *MockSegment_BatchTestLocationCache_Call) Return(_a0 []bool) *MockSegment_BatchTestLocationCache_Call {
func (_c *MockSegment_BatchPkExist_Call) Return(_a0 []bool) *MockSegment_BatchPkExist_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockSegment_BatchTestLocationCache_Call) RunAndReturn(run func(*storage.BatchLocationsCache) []bool) *MockSegment_BatchTestLocationCache_Call {
func (_c *MockSegment_BatchPkExist_Call) RunAndReturn(run func(*storage.BatchLocationsCache) []bool) *MockSegment_BatchPkExist_Call {
_c.Call.Return(run)
return _c
}
@ -712,6 +714,49 @@ func (_c *MockSegment_LoadDeltaData_Call) RunAndReturn(run func(context.Context,
return _c
}
// LoadDeltaData2 provides a mock function with given fields: ctx, schema
func (_m *MockSegment) LoadDeltaData2(ctx context.Context, schema *schemapb.CollectionSchema) error {
ret := _m.Called(ctx, schema)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *schemapb.CollectionSchema) error); ok {
r0 = rf(ctx, schema)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockSegment_LoadDeltaData2_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LoadDeltaData2'
type MockSegment_LoadDeltaData2_Call struct {
*mock.Call
}
// LoadDeltaData2 is a helper method to define mock.On call
// - ctx context.Context
// - schema *schemapb.CollectionSchema
func (_e *MockSegment_Expecter) LoadDeltaData2(ctx interface{}, schema interface{}) *MockSegment_LoadDeltaData2_Call {
return &MockSegment_LoadDeltaData2_Call{Call: _e.mock.On("LoadDeltaData2", ctx, schema)}
}
func (_c *MockSegment_LoadDeltaData2_Call) Run(run func(ctx context.Context, schema *schemapb.CollectionSchema)) *MockSegment_LoadDeltaData2_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*schemapb.CollectionSchema))
})
return _c
}
func (_c *MockSegment_LoadDeltaData2_Call) Return(_a0 error) *MockSegment_LoadDeltaData2_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockSegment_LoadDeltaData2_Call) RunAndReturn(run func(context.Context, *schemapb.CollectionSchema) error) *MockSegment_LoadDeltaData2_Call {
_c.Call.Return(run)
return _c
}
// LoadInfo provides a mock function with given fields:
func (_m *MockSegment) LoadInfo() *querypb.SegmentLoadInfo {
ret := _m.Called()

View File

@ -20,6 +20,7 @@ import (
"context"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/segcorepb"
@ -78,6 +79,7 @@ type Segment interface {
Insert(ctx context.Context, rowIDs []int64, timestamps []typeutil.Timestamp, record *segcorepb.InsertRecord) error
Delete(ctx context.Context, primaryKeys []storage.PrimaryKey, timestamps []typeutil.Timestamp) error
LoadDeltaData(ctx context.Context, deltaData *storage.DeleteData) error
LoadDeltaData2(ctx context.Context, schema *schemapb.CollectionSchema) error // storageV2
LastDeltaTimestamp() uint64
Release(ctx context.Context, opts ...releaseOption)

View File

@ -23,6 +23,7 @@ import (
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/segcorepb"
@ -160,6 +161,10 @@ func (s *L0Segment) LoadDeltaData(ctx context.Context, deltaData *storage.Delete
return nil
}
func (s *L0Segment) LoadDeltaData2(ctx context.Context, schema *schemapb.CollectionSchema) error {
return merr.WrapErrServiceInternal("not implemented")
}
func (s *L0Segment) DeleteRecords() ([]storage.PrimaryKey, []uint64) {
s.dataGuard.RLock()
defer s.dataGuard.RUnlock()

View File

@ -81,15 +81,13 @@ type Loader interface {
LoadBloomFilterSet(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) ([]*pkoracle.BloomFilterSet, error)
// LoadIndex append index for segment and remove vector binlogs.
LoadIndex(ctx context.Context, segment *LocalSegment, info *querypb.SegmentLoadInfo, version int64) error
LoadSegment(ctx context.Context,
segment *LocalSegment,
loadInfo *querypb.SegmentLoadInfo,
) error
LoadIndex(ctx context.Context,
segment Segment,
info *querypb.SegmentLoadInfo,
version int64) error
LoadLazySegment(ctx context.Context,
segment *LocalSegment,
segment Segment,
loadInfo *querypb.SegmentLoadInfo,
) error
}
@ -140,7 +138,7 @@ func NewLoaderV2(
}
}
func (loader *segmentLoaderV2) LoadDelta(ctx context.Context, collectionID int64, segment *LocalSegment) error {
func (loader *segmentLoaderV2) LoadDelta(ctx context.Context, collectionID int64, segment Segment) error {
collection := loader.manager.Collection.Get(collectionID)
if collection == nil {
err := merr.WrapErrCollectionNotFound(collectionID)
@ -230,7 +228,7 @@ func (loader *segmentLoaderV2) Load(ctx context.Context,
var err error
if loadInfo.GetLevel() == datapb.SegmentLevel_L0 {
err = loader.LoadDelta(ctx, collectionID, segment.(*LocalSegment))
err = loader.LoadDelta(ctx, collectionID, segment)
} else {
err = loader.LoadSegment(ctx, segment.(*LocalSegment), loadInfo)
}
@ -390,9 +388,10 @@ func (loader *segmentLoaderV2) loadBloomFilter(ctx context.Context, segmentID in
}
func (loader *segmentLoaderV2) LoadSegment(ctx context.Context,
segment *LocalSegment,
seg Segment,
loadInfo *querypb.SegmentLoadInfo,
) (err error) {
segment := seg.(*LocalSegment)
// TODO: we should create a transaction-like api to load segment for segment interface,
// but not do many things in segment loader.
stateLockGuard, err := segment.StartLoadData()
@ -498,7 +497,7 @@ func (loader *segmentLoaderV2) LoadSegment(ctx context.Context,
}
func (loader *segmentLoaderV2) LoadLazySegment(ctx context.Context,
segment *LocalSegment,
segment Segment,
loadInfo *querypb.SegmentLoadInfo,
) (err error) {
return merr.ErrOperationNotSupported
@ -1087,9 +1086,13 @@ func (loader *segmentLoader) loadSealedSegment(ctx context.Context, loadInfo *qu
}
func (loader *segmentLoader) LoadSegment(ctx context.Context,
segment *LocalSegment,
seg Segment,
loadInfo *querypb.SegmentLoadInfo,
) (err error) {
segment, ok := seg.(*LocalSegment)
if !ok {
return merr.WrapErrParameterInvalid("LocalSegment", fmt.Sprintf("%T", seg))
}
log := log.Ctx(ctx).With(
zap.Int64("collectionID", segment.Collection()),
zap.Int64("partitionID", segment.Partition()),
@ -1135,7 +1138,7 @@ func (loader *segmentLoader) LoadSegment(ctx context.Context,
}
func (loader *segmentLoader) LoadLazySegment(ctx context.Context,
segment *LocalSegment,
segment Segment,
loadInfo *querypb.SegmentLoadInfo,
) (err error) {
resource, err := loader.requestResourceWithTimeout(ctx, loadInfo)
@ -1660,7 +1663,15 @@ func (loader *segmentLoader) getFieldType(collectionID, fieldID int64) (schemapb
return 0, merr.WrapErrFieldNotFound(fieldID)
}
func (loader *segmentLoader) LoadIndex(ctx context.Context, segment *LocalSegment, loadInfo *querypb.SegmentLoadInfo, version int64) error {
func (loader *segmentLoader) LoadIndex(ctx context.Context,
seg Segment,
loadInfo *querypb.SegmentLoadInfo,
version int64,
) error {
segment, ok := seg.(*LocalSegment)
if !ok {
return merr.WrapErrParameterInvalid("LocalSegment", fmt.Sprintf("%T", seg))
}
log := log.Ctx(ctx).With(
zap.Int64("collection", segment.Collection()),
zap.Int64("segment", segment.ID()),