Support get segment info without binlog path (#22741)

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
pull/22771/head
cai.zhang 2023-03-14 21:47:55 +08:00 committed by GitHub
parent 65f5400062
commit cb96314fe7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 856 additions and 433 deletions

View File

@ -684,6 +684,105 @@ func TestGetSegmentInfo(t *testing.T) {
})
}
func TestServer_ListSegmentsInfo(t *testing.T) {
t.Run("normal case", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
segInfo := &datapb.SegmentInfo{
ID: 0,
State: commonpb.SegmentState_Flushed,
NumOfRows: 100,
Binlogs: []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 801),
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 802),
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 803),
},
},
},
},
}
err := svr.meta.AddSegment(NewSegmentInfo(segInfo))
assert.Nil(t, err)
req := &datapb.ListSegmentsInfoRequest{
SegmentIDs: []int64{0},
}
resp, err := svr.ListSegmentsInfo(svr.ctx, req)
assert.Equal(t, 1, len(resp.GetInfos()))
// Check that # of rows is corrected from 100 to 60.
assert.EqualValues(t, 60, resp.GetInfos()[0].GetNumOfRows())
assert.Nil(t, err)
assert.Equal(t, 0, len(resp.GetInfos()[0].GetBinlogs()))
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("with wrong segment id", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
segInfo := &datapb.SegmentInfo{
ID: 0,
State: commonpb.SegmentState_Flushed,
}
err := svr.meta.AddSegment(NewSegmentInfo(segInfo))
assert.Nil(t, err)
req := &datapb.ListSegmentsInfoRequest{
SegmentIDs: []int64{0, 1},
}
resp, err := svr.ListSegmentsInfo(svr.ctx, req)
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
})
t.Run("with closed server", func(t *testing.T) {
svr := newTestServer(t, nil)
closeTestServer(t, svr)
resp, err := svr.ListSegmentsInfo(context.Background(), &datapb.ListSegmentsInfoRequest{
SegmentIDs: []int64{},
})
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_NotReadyServe, resp.GetStatus().GetErrorCode())
})
t.Run("with dropped segment", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
segInfo := &datapb.SegmentInfo{
ID: 0,
State: commonpb.SegmentState_Dropped,
}
err := svr.meta.AddSegment(NewSegmentInfo(segInfo))
assert.Nil(t, err)
req := &datapb.ListSegmentsInfoRequest{
SegmentIDs: []int64{0},
IncludeUnHealthy: false,
}
resp, err := svr.ListSegmentsInfo(svr.ctx, req)
assert.Nil(t, err)
assert.Equal(t, 0, len(resp.Infos))
req = &datapb.ListSegmentsInfoRequest{
SegmentIDs: []int64{0},
IncludeUnHealthy: true,
}
resp2, err := svr.ListSegmentsInfo(svr.ctx, req)
assert.Nil(t, err)
assert.Equal(t, 1, len(resp2.Infos))
})
}
func TestGetComponentStates(t *testing.T) {
svr := &Server{}
resp, err := svr.GetComponentStates(context.Background())

View File

@ -384,6 +384,39 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR
return resp, nil
}
func (s *Server) ListSegmentsInfo(ctx context.Context, req *datapb.ListSegmentsInfoRequest) (*datapb.ListSegmentsInfoResponse, error) {
resp := &datapb.ListSegmentsInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}
if s.isClosed() {
setNotServingStatus(resp.Status, s.GetStateCode())
return resp, nil
}
infos := make([]*datapb.SegmentInfo, 0, len(req.GetSegmentIDs()))
for _, id := range req.SegmentIDs {
var info *SegmentInfo
var clonedInfo *SegmentInfo
info = s.meta.GetSegment(id)
if info == nil || (!req.GetIncludeUnHealthy() && !isSegmentHealthy(info)) {
log.Warn("failed to get segment, this may have been cleaned", zap.Int64("segmentID", id))
resp.Status.Reason = msgSegmentNotFound(id)
return resp, nil
}
clonedInfo = info.Clone()
segmentutil.ReCalcRowCount(info.SegmentInfo, clonedInfo.SegmentInfo)
// don't return binlog
clonedInfo.Binlogs = nil
clonedInfo.Statslogs = nil
clonedInfo.Deltalogs = nil
infos = append(infos, clonedInfo.SegmentInfo)
}
resp.Status.ErrorCode = commonpb.ErrorCode_Success
resp.Infos = infos
return resp, nil
}
// SaveBinlogPaths updates segment related binlog path
// works for Checkpoints and Flush
func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) {

View File

@ -809,3 +809,16 @@ func (c *Client) GcConfirm(ctx context.Context, req *datapb.GcConfirmRequest) (*
}
return ret.(*datapb.GcConfirmResponse), err
}
func (c *Client) ListSegmentsInfo(ctx context.Context, req *datapb.ListSegmentsInfoRequest) (*datapb.ListSegmentsInfoResponse, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return client.ListSegmentsInfo(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*datapb.ListSegmentsInfoResponse), err
}

View File

@ -153,6 +153,9 @@ func Test_NewClient(t *testing.T) {
r31, err := client.ShowConfigurations(ctx, nil)
retCheck(retNotNil, r31, err)
r32, err := client.ListSegmentsInfo(ctx, nil)
retCheck(retNotNil, r32, err)
{
ret, err := client.BroadcastAlteredCollection(ctx, nil)
retCheck(retNotNil, ret, err)

View File

@ -422,3 +422,7 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque
func (s *Server) GcConfirm(ctx context.Context, request *datapb.GcConfirmRequest) (*datapb.GcConfirmResponse, error) {
return s.dataCoord.GcConfirm(ctx, request)
}
func (s *Server) ListSegmentsInfo(ctx context.Context, req *datapb.ListSegmentsInfoRequest) (*datapb.ListSegmentsInfoResponse, error) {
return s.dataCoord.ListSegmentsInfo(ctx, req)
}

View File

@ -71,6 +71,7 @@ type MockDataCoord struct {
unsetIsImportingStateResp *commonpb.Status
markSegmentsDroppedResp *commonpb.Status
broadCastResp *commonpb.Status
listSegmentsInfoResp *datapb.ListSegmentsInfoResponse
}
func (m *MockDataCoord) Init() error {
@ -237,6 +238,10 @@ func (m *MockDataCoord) CheckHealth(ctx context.Context, req *milvuspb.CheckHeal
}, nil
}
func (m *MockDataCoord) ListSegmentsInfo(ctx context.Context, req *datapb.ListSegmentsInfoRequest) (*datapb.ListSegmentsInfoResponse, error) {
return m.listSegmentsInfoResp, nil
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func Test_NewServer(t *testing.T) {
ctx := context.Background()
@ -557,6 +562,15 @@ func Test_NewServer(t *testing.T) {
assert.NotNil(t, resp)
})
t.Run("list segments info", func(t *testing.T) {
server.dataCoord = &MockDataCoord{
listSegmentsInfoResp: &datapb.ListSegmentsInfoResponse{},
}
resp, err := server.ListSegmentsInfo(ctx, nil)
assert.Nil(t, err)
assert.NotNil(t, resp)
})
t.Run("CheckHealth", func(t *testing.T) {
server.dataCoord = &MockDataCoord{}
ret, err := server.CheckHealth(ctx, nil)

View File

@ -652,6 +652,10 @@ func (m *MockDataCoord) GcConfirm(ctx context.Context, req *datapb.GcConfirmRequ
return nil, nil
}
func (m *MockDataCoord) ListSegmentsInfo(ctx context.Context, req *datapb.ListSegmentsInfoRequest) (*datapb.ListSegmentsInfoResponse, error) {
return nil, nil
}
// /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type MockProxy struct {
MockBase

View File

@ -206,7 +206,7 @@ func (hd *handoff) process(segID UniqueID) {
return
}
if state.state == commonpb.IndexState_Finished {
log.Ctx(hd.ctx).Debug("build index for segment success, write handoff event...", zap.Int64("segID", segID))
log.Ctx(hd.ctx).Info("build index for segment success, write handoff event...", zap.Int64("segID", segID))
info, err := hd.ic.pullSegmentInfo(hd.ctx, segID)
if err != nil {
if errors.Is(err, ErrSegmentNotFound) {

View File

@ -684,8 +684,8 @@ func (i *IndexCoord) GetIndexBuildProgress(ctx context.Context, req *indexpb.Get
}, err
}
resp, err := i.dataCoordClient.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
SegmentIDs: flushSegments.Segments,
resp, err := i.dataCoordClient.ListSegmentsInfo(ctx, &datapb.ListSegmentsInfoRequest{
SegmentIDs: flushSegments.GetSegments(),
IncludeUnHealthy: true,
})
if err != nil {
@ -890,8 +890,8 @@ func (i *IndexCoord) DescribeIndex(ctx context.Context, req *indexpb.DescribeInd
return ret, nil
}
resp, err := i.dataCoordClient.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
SegmentIDs: flushedSegmentR.Segments,
resp, err := i.dataCoordClient.ListSegmentsInfo(ctx, &datapb.ListSegmentsInfoRequest{
SegmentIDs: flushedSegmentR.GetSegments(),
IncludeUnHealthy: true,
})
if err != nil {

View File

@ -379,6 +379,7 @@ type DataCoordMock struct {
CallGetFlushedSegment func(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error)
CallAcquireSegmentLock func(ctx context.Context, req *datapb.AcquireSegmentLockRequest) (*commonpb.Status, error)
CallReleaseSegmentLock func(ctx context.Context, req *datapb.ReleaseSegmentLockRequest) (*commonpb.Status, error)
CallListSegmentsInfo func(ctx context.Context, req *datapb.ListSegmentsInfoRequest) (*datapb.ListSegmentsInfoResponse, error)
}
func (dcm *DataCoordMock) Init() error {
@ -418,6 +419,10 @@ func (dcm *DataCoordMock) GetFlushedSegments(ctx context.Context, req *datapb.Ge
return dcm.CallGetFlushedSegment(ctx, req)
}
func (dcm *DataCoordMock) ListSegmentsInfo(ctx context.Context, req *datapb.ListSegmentsInfoRequest) (*datapb.ListSegmentsInfoResponse, error) {
return dcm.CallListSegmentsInfo(ctx, req)
}
func NewDataCoordMock() *DataCoordMock {
return &DataCoordMock{
CallInit: func() error {

View File

@ -185,6 +185,40 @@ func testIndexCoord(t *testing.T) {
ErrorCode: commonpb.ErrorCode_Success,
}, nil
},
CallListSegmentsInfo: func(ctx context.Context, req *datapb.ListSegmentsInfoRequest) (*datapb.ListSegmentsInfoResponse, error) {
segmentInfos := make([]*datapb.SegmentInfo, 0)
for _, segID := range req.SegmentIDs {
segmentInfos = append(segmentInfos, &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10240,
State: commonpb.SegmentState_Flushed,
StartPosition: &internalpb.MsgPosition{
Timestamp: createTs,
},
Binlogs: []*datapb.FieldBinlog{
{
FieldID: fieldID,
Binlogs: []*datapb.Binlog{
{
LogPath: "file1",
},
{
LogPath: "file2",
},
},
},
},
})
}
return &datapb.ListSegmentsInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Infos: segmentInfos,
}, nil
},
}
err = ic.SetDataCoord(dcm)
assert.Nil(t, err)
@ -388,7 +422,7 @@ func testIndexCoord(t *testing.T) {
dcm.CallGetFlushedSegment = getFlushedSegmentsMock([]int64{111, 222, 333})
dcm.SetFunc(func() {
dcm.CallGetSegmentInfo = func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
dcm.CallListSegmentsInfo = func(ctx context.Context, req *datapb.ListSegmentsInfoRequest) (*datapb.ListSegmentsInfoResponse, error) {
return nil, errors.New("mock error")
}
})
@ -397,8 +431,8 @@ func testIndexCoord(t *testing.T) {
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
dcm.SetFunc(func() {
dcm.CallGetSegmentInfo = func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
return &datapb.GetSegmentInfoResponse{
dcm.CallListSegmentsInfo = func(ctx context.Context, req *datapb.ListSegmentsInfoRequest) (*datapb.ListSegmentsInfoResponse, error) {
return &datapb.ListSegmentsInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "mock fail",
@ -411,8 +445,8 @@ func testIndexCoord(t *testing.T) {
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
dcm.SetFunc(func() {
dcm.CallGetSegmentInfo = func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
return &datapb.GetSegmentInfoResponse{
dcm.CallListSegmentsInfo = func(ctx context.Context, req *datapb.ListSegmentsInfoRequest) (*datapb.ListSegmentsInfoResponse, error) {
return &datapb.ListSegmentsInfoResponse{
Infos: []*datapb.SegmentInfo{
{ID: 222, State: commonpb.SegmentState_Flushed, NumOfRows: 2048},
{ID: 333, State: commonpb.SegmentState_Flushed, NumOfRows: 2048},

View File

@ -221,7 +221,7 @@ func (cit *CreateIndexTask) Execute(ctx context.Context) error {
log.Info("IndexCoord get flushed segment from DataCoord success", zap.Int64("collectionID", cit.req.CollectionID),
zap.Int64s("flushed segments", flushedSegments.Segments))
segmentsInfo, err := cit.dataCoordClient.GetSegmentInfo(cit.ctx, &datapb.GetSegmentInfoRequest{
segmentsInfo, err := cit.dataCoordClient.ListSegmentsInfo(cit.ctx, &datapb.ListSegmentsInfoRequest{
SegmentIDs: flushedSegments.Segments,
IncludeUnHealthy: true,
})

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.15.0. DO NOT EDIT.
// Code generated by mockery v2.16.0. DO NOT EDIT.
package mocks
@ -58,8 +58,8 @@ type DataCoord_AcquireSegmentLock_Call struct {
}
// AcquireSegmentLock is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.AcquireSegmentLockRequest
// - ctx context.Context
// - req *datapb.AcquireSegmentLockRequest
func (_e *DataCoord_Expecter) AcquireSegmentLock(ctx interface{}, req interface{}) *DataCoord_AcquireSegmentLock_Call {
return &DataCoord_AcquireSegmentLock_Call{Call: _e.mock.On("AcquireSegmentLock", ctx, req)}
}
@ -105,8 +105,8 @@ type DataCoord_AssignSegmentID_Call struct {
}
// AssignSegmentID is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.AssignSegmentIDRequest
// - ctx context.Context
// - req *datapb.AssignSegmentIDRequest
func (_e *DataCoord_Expecter) AssignSegmentID(ctx interface{}, req interface{}) *DataCoord_AssignSegmentID_Call {
return &DataCoord_AssignSegmentID_Call{Call: _e.mock.On("AssignSegmentID", ctx, req)}
}
@ -152,8 +152,8 @@ type DataCoord_BroadcastAlteredCollection_Call struct {
}
// BroadcastAlteredCollection is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.AlterCollectionRequest
// - ctx context.Context
// - req *datapb.AlterCollectionRequest
func (_e *DataCoord_Expecter) BroadcastAlteredCollection(ctx interface{}, req interface{}) *DataCoord_BroadcastAlteredCollection_Call {
return &DataCoord_BroadcastAlteredCollection_Call{Call: _e.mock.On("BroadcastAlteredCollection", ctx, req)}
}
@ -199,8 +199,8 @@ type DataCoord_CheckHealth_Call struct {
}
// CheckHealth is a helper method to define mock.On call
// - ctx context.Context
// - req *milvuspb.CheckHealthRequest
// - ctx context.Context
// - req *milvuspb.CheckHealthRequest
func (_e *DataCoord_Expecter) CheckHealth(ctx interface{}, req interface{}) *DataCoord_CheckHealth_Call {
return &DataCoord_CheckHealth_Call{Call: _e.mock.On("CheckHealth", ctx, req)}
}
@ -246,8 +246,8 @@ type DataCoord_DropVirtualChannel_Call struct {
}
// DropVirtualChannel is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.DropVirtualChannelRequest
// - ctx context.Context
// - req *datapb.DropVirtualChannelRequest
func (_e *DataCoord_Expecter) DropVirtualChannel(ctx interface{}, req interface{}) *DataCoord_DropVirtualChannel_Call {
return &DataCoord_DropVirtualChannel_Call{Call: _e.mock.On("DropVirtualChannel", ctx, req)}
}
@ -293,8 +293,8 @@ type DataCoord_Flush_Call struct {
}
// Flush is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.FlushRequest
// - ctx context.Context
// - req *datapb.FlushRequest
func (_e *DataCoord_Expecter) Flush(ctx interface{}, req interface{}) *DataCoord_Flush_Call {
return &DataCoord_Flush_Call{Call: _e.mock.On("Flush", ctx, req)}
}
@ -340,8 +340,8 @@ type DataCoord_GcConfirm_Call struct {
}
// GcConfirm is a helper method to define mock.On call
// - ctx context.Context
// - request *datapb.GcConfirmRequest
// - ctx context.Context
// - request *datapb.GcConfirmRequest
func (_e *DataCoord_Expecter) GcConfirm(ctx interface{}, request interface{}) *DataCoord_GcConfirm_Call {
return &DataCoord_GcConfirm_Call{Call: _e.mock.On("GcConfirm", ctx, request)}
}
@ -387,8 +387,8 @@ type DataCoord_GetCollectionStatistics_Call struct {
}
// GetCollectionStatistics is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.GetCollectionStatisticsRequest
// - ctx context.Context
// - req *datapb.GetCollectionStatisticsRequest
func (_e *DataCoord_Expecter) GetCollectionStatistics(ctx interface{}, req interface{}) *DataCoord_GetCollectionStatistics_Call {
return &DataCoord_GetCollectionStatistics_Call{Call: _e.mock.On("GetCollectionStatistics", ctx, req)}
}
@ -434,8 +434,8 @@ type DataCoord_GetCompactionState_Call struct {
}
// GetCompactionState is a helper method to define mock.On call
// - ctx context.Context
// - req *milvuspb.GetCompactionStateRequest
// - ctx context.Context
// - req *milvuspb.GetCompactionStateRequest
func (_e *DataCoord_Expecter) GetCompactionState(ctx interface{}, req interface{}) *DataCoord_GetCompactionState_Call {
return &DataCoord_GetCompactionState_Call{Call: _e.mock.On("GetCompactionState", ctx, req)}
}
@ -481,8 +481,8 @@ type DataCoord_GetCompactionStateWithPlans_Call struct {
}
// GetCompactionStateWithPlans is a helper method to define mock.On call
// - ctx context.Context
// - req *milvuspb.GetCompactionPlansRequest
// - ctx context.Context
// - req *milvuspb.GetCompactionPlansRequest
func (_e *DataCoord_Expecter) GetCompactionStateWithPlans(ctx interface{}, req interface{}) *DataCoord_GetCompactionStateWithPlans_Call {
return &DataCoord_GetCompactionStateWithPlans_Call{Call: _e.mock.On("GetCompactionStateWithPlans", ctx, req)}
}
@ -528,7 +528,7 @@ type DataCoord_GetComponentStates_Call struct {
}
// GetComponentStates is a helper method to define mock.On call
// - ctx context.Context
// - ctx context.Context
func (_e *DataCoord_Expecter) GetComponentStates(ctx interface{}) *DataCoord_GetComponentStates_Call {
return &DataCoord_GetComponentStates_Call{Call: _e.mock.On("GetComponentStates", ctx)}
}
@ -574,8 +574,8 @@ type DataCoord_GetFlushState_Call struct {
}
// GetFlushState is a helper method to define mock.On call
// - ctx context.Context
// - req *milvuspb.GetFlushStateRequest
// - ctx context.Context
// - req *milvuspb.GetFlushStateRequest
func (_e *DataCoord_Expecter) GetFlushState(ctx interface{}, req interface{}) *DataCoord_GetFlushState_Call {
return &DataCoord_GetFlushState_Call{Call: _e.mock.On("GetFlushState", ctx, req)}
}
@ -621,8 +621,8 @@ type DataCoord_GetFlushedSegments_Call struct {
}
// GetFlushedSegments is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.GetFlushedSegmentsRequest
// - ctx context.Context
// - req *datapb.GetFlushedSegmentsRequest
func (_e *DataCoord_Expecter) GetFlushedSegments(ctx interface{}, req interface{}) *DataCoord_GetFlushedSegments_Call {
return &DataCoord_GetFlushedSegments_Call{Call: _e.mock.On("GetFlushedSegments", ctx, req)}
}
@ -668,8 +668,8 @@ type DataCoord_GetInsertBinlogPaths_Call struct {
}
// GetInsertBinlogPaths is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.GetInsertBinlogPathsRequest
// - ctx context.Context
// - req *datapb.GetInsertBinlogPathsRequest
func (_e *DataCoord_Expecter) GetInsertBinlogPaths(ctx interface{}, req interface{}) *DataCoord_GetInsertBinlogPaths_Call {
return &DataCoord_GetInsertBinlogPaths_Call{Call: _e.mock.On("GetInsertBinlogPaths", ctx, req)}
}
@ -715,8 +715,8 @@ type DataCoord_GetMetrics_Call struct {
}
// GetMetrics is a helper method to define mock.On call
// - ctx context.Context
// - req *milvuspb.GetMetricsRequest
// - ctx context.Context
// - req *milvuspb.GetMetricsRequest
func (_e *DataCoord_Expecter) GetMetrics(ctx interface{}, req interface{}) *DataCoord_GetMetrics_Call {
return &DataCoord_GetMetrics_Call{Call: _e.mock.On("GetMetrics", ctx, req)}
}
@ -762,8 +762,8 @@ type DataCoord_GetPartitionStatistics_Call struct {
}
// GetPartitionStatistics is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.GetPartitionStatisticsRequest
// - ctx context.Context
// - req *datapb.GetPartitionStatisticsRequest
func (_e *DataCoord_Expecter) GetPartitionStatistics(ctx interface{}, req interface{}) *DataCoord_GetPartitionStatistics_Call {
return &DataCoord_GetPartitionStatistics_Call{Call: _e.mock.On("GetPartitionStatistics", ctx, req)}
}
@ -809,8 +809,8 @@ type DataCoord_GetRecoveryInfo_Call struct {
}
// GetRecoveryInfo is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.GetRecoveryInfoRequest
// - ctx context.Context
// - req *datapb.GetRecoveryInfoRequest
func (_e *DataCoord_Expecter) GetRecoveryInfo(ctx interface{}, req interface{}) *DataCoord_GetRecoveryInfo_Call {
return &DataCoord_GetRecoveryInfo_Call{Call: _e.mock.On("GetRecoveryInfo", ctx, req)}
}
@ -856,8 +856,8 @@ type DataCoord_GetSegmentInfo_Call struct {
}
// GetSegmentInfo is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.GetSegmentInfoRequest
// - ctx context.Context
// - req *datapb.GetSegmentInfoRequest
func (_e *DataCoord_Expecter) GetSegmentInfo(ctx interface{}, req interface{}) *DataCoord_GetSegmentInfo_Call {
return &DataCoord_GetSegmentInfo_Call{Call: _e.mock.On("GetSegmentInfo", ctx, req)}
}
@ -903,7 +903,7 @@ type DataCoord_GetSegmentInfoChannel_Call struct {
}
// GetSegmentInfoChannel is a helper method to define mock.On call
// - ctx context.Context
// - ctx context.Context
func (_e *DataCoord_Expecter) GetSegmentInfoChannel(ctx interface{}) *DataCoord_GetSegmentInfoChannel_Call {
return &DataCoord_GetSegmentInfoChannel_Call{Call: _e.mock.On("GetSegmentInfoChannel", ctx)}
}
@ -949,8 +949,8 @@ type DataCoord_GetSegmentStates_Call struct {
}
// GetSegmentStates is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.GetSegmentStatesRequest
// - ctx context.Context
// - req *datapb.GetSegmentStatesRequest
func (_e *DataCoord_Expecter) GetSegmentStates(ctx interface{}, req interface{}) *DataCoord_GetSegmentStates_Call {
return &DataCoord_GetSegmentStates_Call{Call: _e.mock.On("GetSegmentStates", ctx, req)}
}
@ -996,8 +996,8 @@ type DataCoord_GetSegmentsByStates_Call struct {
}
// GetSegmentsByStates is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.GetSegmentsByStatesRequest
// - ctx context.Context
// - req *datapb.GetSegmentsByStatesRequest
func (_e *DataCoord_Expecter) GetSegmentsByStates(ctx interface{}, req interface{}) *DataCoord_GetSegmentsByStates_Call {
return &DataCoord_GetSegmentsByStates_Call{Call: _e.mock.On("GetSegmentsByStates", ctx, req)}
}
@ -1043,7 +1043,7 @@ type DataCoord_GetStatisticsChannel_Call struct {
}
// GetStatisticsChannel is a helper method to define mock.On call
// - ctx context.Context
// - ctx context.Context
func (_e *DataCoord_Expecter) GetStatisticsChannel(ctx interface{}) *DataCoord_GetStatisticsChannel_Call {
return &DataCoord_GetStatisticsChannel_Call{Call: _e.mock.On("GetStatisticsChannel", ctx)}
}
@ -1089,7 +1089,7 @@ type DataCoord_GetTimeTickChannel_Call struct {
}
// GetTimeTickChannel is a helper method to define mock.On call
// - ctx context.Context
// - ctx context.Context
func (_e *DataCoord_Expecter) GetTimeTickChannel(ctx interface{}) *DataCoord_GetTimeTickChannel_Call {
return &DataCoord_GetTimeTickChannel_Call{Call: _e.mock.On("GetTimeTickChannel", ctx)}
}
@ -1135,8 +1135,8 @@ type DataCoord_Import_Call struct {
}
// Import is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.ImportTaskRequest
// - ctx context.Context
// - req *datapb.ImportTaskRequest
func (_e *DataCoord_Expecter) Import(ctx interface{}, req interface{}) *DataCoord_Import_Call {
return &DataCoord_Import_Call{Call: _e.mock.On("Import", ctx, req)}
}
@ -1189,6 +1189,53 @@ func (_c *DataCoord_Init_Call) Return(_a0 error) *DataCoord_Init_Call {
return _c
}
// ListSegmentsInfo provides a mock function with given fields: ctx, req
func (_m *DataCoord) ListSegmentsInfo(ctx context.Context, req *datapb.ListSegmentsInfoRequest) (*datapb.ListSegmentsInfoResponse, error) {
ret := _m.Called(ctx, req)
var r0 *datapb.ListSegmentsInfoResponse
if rf, ok := ret.Get(0).(func(context.Context, *datapb.ListSegmentsInfoRequest) *datapb.ListSegmentsInfoResponse); ok {
r0 = rf(ctx, req)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*datapb.ListSegmentsInfoResponse)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *datapb.ListSegmentsInfoRequest) error); ok {
r1 = rf(ctx, req)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// DataCoord_ListSegmentsInfo_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListSegmentsInfo'
type DataCoord_ListSegmentsInfo_Call struct {
*mock.Call
}
// ListSegmentsInfo is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.ListSegmentsInfoRequest
func (_e *DataCoord_Expecter) ListSegmentsInfo(ctx interface{}, req interface{}) *DataCoord_ListSegmentsInfo_Call {
return &DataCoord_ListSegmentsInfo_Call{Call: _e.mock.On("ListSegmentsInfo", ctx, req)}
}
func (_c *DataCoord_ListSegmentsInfo_Call) Run(run func(ctx context.Context, req *datapb.ListSegmentsInfoRequest)) *DataCoord_ListSegmentsInfo_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*datapb.ListSegmentsInfoRequest))
})
return _c
}
func (_c *DataCoord_ListSegmentsInfo_Call) Return(_a0 *datapb.ListSegmentsInfoResponse, _a1 error) *DataCoord_ListSegmentsInfo_Call {
_c.Call.Return(_a0, _a1)
return _c
}
// ManualCompaction provides a mock function with given fields: ctx, req
func (_m *DataCoord) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompactionRequest) (*milvuspb.ManualCompactionResponse, error) {
ret := _m.Called(ctx, req)
@ -1218,8 +1265,8 @@ type DataCoord_ManualCompaction_Call struct {
}
// ManualCompaction is a helper method to define mock.On call
// - ctx context.Context
// - req *milvuspb.ManualCompactionRequest
// - ctx context.Context
// - req *milvuspb.ManualCompactionRequest
func (_e *DataCoord_Expecter) ManualCompaction(ctx interface{}, req interface{}) *DataCoord_ManualCompaction_Call {
return &DataCoord_ManualCompaction_Call{Call: _e.mock.On("ManualCompaction", ctx, req)}
}
@ -1265,8 +1312,8 @@ type DataCoord_MarkSegmentsDropped_Call struct {
}
// MarkSegmentsDropped is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.MarkSegmentsDroppedRequest
// - ctx context.Context
// - req *datapb.MarkSegmentsDroppedRequest
func (_e *DataCoord_Expecter) MarkSegmentsDropped(ctx interface{}, req interface{}) *DataCoord_MarkSegmentsDropped_Call {
return &DataCoord_MarkSegmentsDropped_Call{Call: _e.mock.On("MarkSegmentsDropped", ctx, req)}
}
@ -1348,8 +1395,8 @@ type DataCoord_ReleaseSegmentLock_Call struct {
}
// ReleaseSegmentLock is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.ReleaseSegmentLockRequest
// - ctx context.Context
// - req *datapb.ReleaseSegmentLockRequest
func (_e *DataCoord_Expecter) ReleaseSegmentLock(ctx interface{}, req interface{}) *DataCoord_ReleaseSegmentLock_Call {
return &DataCoord_ReleaseSegmentLock_Call{Call: _e.mock.On("ReleaseSegmentLock", ctx, req)}
}
@ -1395,8 +1442,8 @@ type DataCoord_SaveBinlogPaths_Call struct {
}
// SaveBinlogPaths is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.SaveBinlogPathsRequest
// - ctx context.Context
// - req *datapb.SaveBinlogPathsRequest
func (_e *DataCoord_Expecter) SaveBinlogPaths(ctx interface{}, req interface{}) *DataCoord_SaveBinlogPaths_Call {
return &DataCoord_SaveBinlogPaths_Call{Call: _e.mock.On("SaveBinlogPaths", ctx, req)}
}
@ -1442,8 +1489,8 @@ type DataCoord_SaveImportSegment_Call struct {
}
// SaveImportSegment is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.SaveImportSegmentRequest
// - ctx context.Context
// - req *datapb.SaveImportSegmentRequest
func (_e *DataCoord_Expecter) SaveImportSegment(ctx interface{}, req interface{}) *DataCoord_SaveImportSegment_Call {
return &DataCoord_SaveImportSegment_Call{Call: _e.mock.On("SaveImportSegment", ctx, req)}
}
@ -1489,8 +1536,8 @@ type DataCoord_SetSegmentState_Call struct {
}
// SetSegmentState is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.SetSegmentStateRequest
// - ctx context.Context
// - req *datapb.SetSegmentStateRequest
func (_e *DataCoord_Expecter) SetSegmentState(ctx interface{}, req interface{}) *DataCoord_SetSegmentState_Call {
return &DataCoord_SetSegmentState_Call{Call: _e.mock.On("SetSegmentState", ctx, req)}
}
@ -1536,8 +1583,8 @@ type DataCoord_ShowConfigurations_Call struct {
}
// ShowConfigurations is a helper method to define mock.On call
// - ctx context.Context
// - req *internalpb.ShowConfigurationsRequest
// - ctx context.Context
// - req *internalpb.ShowConfigurationsRequest
func (_e *DataCoord_Expecter) ShowConfigurations(ctx interface{}, req interface{}) *DataCoord_ShowConfigurations_Call {
return &DataCoord_ShowConfigurations_Call{Call: _e.mock.On("ShowConfigurations", ctx, req)}
}
@ -1655,8 +1702,8 @@ type DataCoord_UnsetIsImportingState_Call struct {
}
// UnsetIsImportingState is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.UnsetIsImportingStateRequest
// - ctx context.Context
// - req *datapb.UnsetIsImportingStateRequest
func (_e *DataCoord_Expecter) UnsetIsImportingState(ctx interface{}, req interface{}) *DataCoord_UnsetIsImportingState_Call {
return &DataCoord_UnsetIsImportingState_Call{Call: _e.mock.On("UnsetIsImportingState", ctx, req)}
}
@ -1702,8 +1749,8 @@ type DataCoord_UpdateChannelCheckpoint_Call struct {
}
// UpdateChannelCheckpoint is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.UpdateChannelCheckpointRequest
// - ctx context.Context
// - req *datapb.UpdateChannelCheckpointRequest
func (_e *DataCoord_Expecter) UpdateChannelCheckpoint(ctx interface{}, req interface{}) *DataCoord_UpdateChannelCheckpoint_Call {
return &DataCoord_UpdateChannelCheckpoint_Call{Call: _e.mock.On("UpdateChannelCheckpoint", ctx, req)}
}
@ -1749,8 +1796,8 @@ type DataCoord_UpdateSegmentStatistics_Call struct {
}
// UpdateSegmentStatistics is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.UpdateSegmentStatisticsRequest
// - ctx context.Context
// - req *datapb.UpdateSegmentStatisticsRequest
func (_e *DataCoord_Expecter) UpdateSegmentStatistics(ctx interface{}, req interface{}) *DataCoord_UpdateSegmentStatistics_Call {
return &DataCoord_UpdateSegmentStatistics_Call{Call: _e.mock.On("UpdateSegmentStatistics", ctx, req)}
}
@ -1796,8 +1843,8 @@ type DataCoord_WatchChannels_Call struct {
}
// WatchChannels is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.WatchChannelsRequest
// - ctx context.Context
// - req *datapb.WatchChannelsRequest
func (_e *DataCoord_Expecter) WatchChannels(ctx interface{}, req interface{}) *DataCoord_WatchChannels_Call {
return &DataCoord_WatchChannels_Call{Call: _e.mock.On("WatchChannels", ctx, req)}
}

View File

@ -31,6 +31,7 @@ service DataCoord {
rpc GetSegmentInfo(GetSegmentInfoRequest) returns (GetSegmentInfoResponse) {}
rpc GetSegmentStates(GetSegmentStatesRequest) returns (GetSegmentStatesResponse) {}
rpc GetInsertBinlogPaths(GetInsertBinlogPathsRequest) returns (GetInsertBinlogPathsResponse) {}
rpc ListSegmentsInfo(ListSegmentsInfoRequest) returns (ListSegmentsInfoResponse) {}
rpc GetCollectionStatistics(GetCollectionStatisticsRequest) returns (GetCollectionStatisticsResponse) {}
rpc GetPartitionStatistics(GetPartitionStatisticsRequest) returns (GetPartitionStatisticsResponse) {}
@ -164,7 +165,7 @@ message GetSegmentStatesResponse {
message GetSegmentInfoRequest {
common.MsgBase base = 1;
repeated int64 segmentIDs = 2;
bool includeUnHealthy =3;
bool includeUnHealthy = 3;
}
message GetSegmentInfoResponse {
@ -173,6 +174,17 @@ message GetSegmentInfoResponse {
map<string, internal.MsgPosition> channel_checkpoint = 3;
}
message ListSegmentsInfoRequest {
common.MsgBase base = 1;
repeated int64 segmentIDs = 2;
bool includeUnHealthy = 3;
}
message ListSegmentsInfoResponse {
common.Status status = 1;
repeated SegmentInfo infos = 2;
}
message GetInsertBinlogPathsRequest {
common.MsgBase base = 1;
int64 segmentID = 2;

File diff suppressed because it is too large Load Diff

View File

@ -227,6 +227,15 @@ type DataCoord interface {
// error is returned only when some communication issue occurs
GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error)
// ListSegmentsInfo requests segment info without binlog path
//
// ctx is the context to control request deadline and cancellation
// req contains the list of segment ids to query
//
// response struct `ListSegmentsInfoResponse` contains the list of segment info
// error is returned only when some communication issue occurs
ListSegmentsInfo(ctx context.Context, req *datapb.ListSegmentsInfoRequest) (*datapb.ListSegmentsInfoResponse, error)
// GetRecoveryInfo request segment recovery info of collection/partition
//
// ctx is the context to control request deadline and cancellation

View File

@ -175,5 +175,8 @@ func (m *GrpcDataCoordClient) MarkSegmentsDropped(context.Context, *datapb.MarkS
func (m *GrpcDataCoordClient) BroadcastAlteredCollection(ctx context.Context, in *datapb.AlterCollectionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
return &commonpb.Status{}, m.Err
}
func (m *GrpcDataCoordClient) ListSegmentsInfo(ctx context.Context, req *datapb.ListSegmentsInfoRequest, opts ...grpc.CallOption) (*datapb.ListSegmentsInfoResponse, error) {
return &datapb.ListSegmentsInfoResponse{}, m.Err
}