Add datacoord services related unit tests (#7467)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/7475/head
congqixia 2021-09-06 11:12:42 +08:00 committed by GitHub
parent ccfb350bd1
commit 1f590d6c26
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 396 additions and 56 deletions

View File

@ -101,6 +101,7 @@ func defaultAssignPolicy() channelAssignPolicy {
return newBalancedAssignPolicy()
}
// NewCluster creates a cluster with provided components
func NewCluster(ctx context.Context, kv kv.TxnKV, store ClusterStore,
posProvider positionProvider, opts ...ClusterOption) (*Cluster, error) {
ctx, cancel := context.WithCancel(ctx)

View File

@ -12,9 +12,11 @@ package datacoord
import (
"context"
"errors"
"fmt"
"testing"
"github.com/milvus-io/milvus/internal/kv"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/stretchr/testify/assert"
@ -57,14 +59,24 @@ func spyWatchPolicy(ch chan interface{}) channelAssignPolicy {
}
}
// a mock kv that always fail when LoadWithPrefix
type loadPrefixFailKv struct {
kv.TxnKV
}
// LoadWithPrefix override behavior
func (kv *loadPrefixFailKv) LoadWithPrefix(key string) ([]string, []string, error) {
return []string{}, []string{}, errors.New("mocked fail")
}
func TestClusterCreate(t *testing.T) {
ch := make(chan interface{})
kv := memkv.NewMemoryKV()
memKv := memkv.NewMemoryKV()
spyClusterStore := &SpyClusterStore{
NodesInfo: NewNodesInfo(),
ch: ch,
}
cluster, err := NewCluster(context.TODO(), kv, spyClusterStore, dummyPosProvider{})
cluster, err := NewCluster(context.TODO(), memKv, spyClusterStore, dummyPosProvider{})
assert.Nil(t, err)
defer cluster.Close()
addr := "localhost:8080"
@ -79,6 +91,13 @@ func TestClusterCreate(t *testing.T) {
dataNodes := cluster.GetNodes()
assert.EqualValues(t, 1, len(dataNodes))
assert.EqualValues(t, "localhost:8080", dataNodes[0].Info.GetAddress())
t.Run("loadKv Fails", func(t *testing.T) {
fkv := &loadPrefixFailKv{TxnKV: memKv}
cluster, err := NewCluster(context.TODO(), fkv, spyClusterStore, dummyPosProvider{})
assert.NotNil(t, err)
assert.Nil(t, cluster)
})
}
func TestRegister(t *testing.T) {

View File

@ -93,6 +93,25 @@ func TestAssignSegmentID(t *testing.T) {
assert.EqualValues(t, 1000, assign.Count)
})
t.Run("with closed server", func(t *testing.T) {
req := &datapb.SegmentIDRequest{
Count: 100,
ChannelName: channel0,
CollectionID: collID,
PartitionID: partID,
}
svr := newTestServer(t, nil)
closeTestServer(t, svr)
resp, err := svr.AssignSegmentID(context.Background(), &datapb.AssignSegmentIDRequest{
NodeID: 0,
PeerRole: "",
SegmentIDRequests: []*datapb.SegmentIDRequest{req},
})
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason())
})
t.Run("assign segment with invalid collection", func(t *testing.T) {
req := &datapb.SegmentIDRequest{
Count: 1000,
@ -112,16 +131,6 @@ func TestAssignSegmentID(t *testing.T) {
}
func TestFlush(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
schema := newTestSchema()
svr.meta.AddCollection(&datapb.CollectionInfo{ID: 0, Schema: schema, Partitions: []int64{}})
allocations, err := svr.segmentManager.AllocSegment(context.TODO(), 0, 1, "channel-1", 1)
assert.Nil(t, err)
assert.EqualValues(t, 1, len(allocations))
expireTs := allocations[0].ExpireTime
segID := allocations[0].SegmentID
req := &datapb.FlushRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Flush,
@ -132,13 +141,34 @@ func TestFlush(t *testing.T) {
DbID: 0,
CollectionID: 0,
}
resp, err := svr.Flush(context.TODO(), req)
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
ids, err := svr.segmentManager.GetFlushableSegments(context.TODO(), "channel-1", expireTs)
assert.Nil(t, err)
assert.EqualValues(t, 1, len(ids))
assert.EqualValues(t, segID, ids[0])
t.Run("normal case", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
schema := newTestSchema()
svr.meta.AddCollection(&datapb.CollectionInfo{ID: 0, Schema: schema, Partitions: []int64{}})
allocations, err := svr.segmentManager.AllocSegment(context.TODO(), 0, 1, "channel-1", 1)
assert.Nil(t, err)
assert.EqualValues(t, 1, len(allocations))
expireTs := allocations[0].ExpireTime
segID := allocations[0].SegmentID
resp, err := svr.Flush(context.TODO(), req)
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
ids, err := svr.segmentManager.GetFlushableSegments(context.TODO(), "channel-1", expireTs)
assert.Nil(t, err)
assert.EqualValues(t, 1, len(ids))
assert.EqualValues(t, segID, ids[0])
})
t.Run("closed server", func(t *testing.T) {
svr := newTestServer(t, nil)
closeTestServer(t, svr)
resp, err := svr.Flush(context.Background(), req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason())
})
}
//func TestGetComponentStates(t *testing.T) {
@ -240,51 +270,292 @@ func TestGetSegmentStates(t *testing.T) {
}
})
}
t.Run("with closed server", func(t *testing.T) {
svr := newTestServer(t, nil)
closeTestServer(t, svr)
resp, err := svr.GetSegmentStates(context.TODO(), &datapb.GetSegmentStatesRequest{
Base: &commonpb.MsgBase{
MsgType: 0,
MsgID: 0,
Timestamp: 0,
SourceID: 0,
},
SegmentIDs: []int64{0},
})
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason())
})
}
func TestGetInsertBinlogPaths(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
t.Run("normal case", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
info := &datapb.SegmentInfo{
ID: 0,
}
svr.meta.AddSegment(NewSegmentInfo(info))
req := &datapb.GetInsertBinlogPathsRequest{
SegmentID: 0,
}
resp, err := svr.GetInsertBinlogPaths(svr.ctx, req)
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
info := &datapb.SegmentInfo{
ID: 0,
Binlogs: []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []string{
"dev/datacoord/testsegment/1/part1",
"dev/datacoord/testsegment/1/part2",
},
},
},
}
svr.meta.AddSegment(NewSegmentInfo(info))
req := &datapb.GetInsertBinlogPathsRequest{
SegmentID: 0,
}
resp, err := svr.GetInsertBinlogPaths(svr.ctx, req)
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("with invalid segment id", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
info := &datapb.SegmentInfo{
ID: 0,
Binlogs: []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []string{
"dev/datacoord/testsegment/1/part1",
"dev/datacoord/testsegment/1/part2",
},
},
},
}
svr.meta.AddSegment(NewSegmentInfo(info))
req := &datapb.GetInsertBinlogPathsRequest{
SegmentID: 1,
}
resp, err := svr.GetInsertBinlogPaths(svr.ctx, req)
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
})
t.Run("with closed server", func(t *testing.T) {
svr := newTestServer(t, nil)
closeTestServer(t, svr)
resp, err := svr.GetInsertBinlogPaths(context.TODO(), &datapb.GetInsertBinlogPathsRequest{
SegmentID: 0,
})
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason())
})
}
func TestGetCollectionStatistics(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
t.Run("normal case", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
req := &datapb.GetCollectionStatisticsRequest{
CollectionID: 0,
}
resp, err := svr.GetCollectionStatistics(svr.ctx, req)
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
req := &datapb.GetCollectionStatisticsRequest{
CollectionID: 0,
}
resp, err := svr.GetCollectionStatistics(svr.ctx, req)
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("with closed server", func(t *testing.T) {
svr := newTestServer(t, nil)
closeTestServer(t, svr)
resp, err := svr.GetCollectionStatistics(context.Background(), &datapb.GetCollectionStatisticsRequest{
CollectionID: 0,
})
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason())
})
}
func TestGetPartitionStatistics(t *testing.T) {
t.Run("normal cases", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
req := &datapb.GetPartitionStatisticsRequest{
CollectionID: 0,
PartitionID: 0,
}
resp, err := svr.GetPartitionStatistics(context.Background(), req)
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("with closed server", func(t *testing.T) {
svr := newTestServer(t, nil)
closeTestServer(t, svr)
resp, err := svr.GetPartitionStatistics(context.Background(), &datapb.GetPartitionStatisticsRequest{})
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason())
})
}
func TestGetSegmentInfo(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
t.Run("normal case", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
segInfo := &datapb.SegmentInfo{
ID: 0,
}
svr.meta.AddSegment(NewSegmentInfo(segInfo))
segInfo := &datapb.SegmentInfo{
ID: 0,
}
svr.meta.AddSegment(NewSegmentInfo(segInfo))
req := &datapb.GetSegmentInfoRequest{
SegmentIDs: []int64{0},
req := &datapb.GetSegmentInfoRequest{
SegmentIDs: []int64{0},
}
resp, err := svr.GetSegmentInfo(svr.ctx, req)
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("with wrong segment id", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
segInfo := &datapb.SegmentInfo{
ID: 0,
}
svr.meta.AddSegment(NewSegmentInfo(segInfo))
req := &datapb.GetSegmentInfoRequest{
SegmentIDs: []int64{0, 1},
}
resp, err := svr.GetSegmentInfo(svr.ctx, req)
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
})
t.Run("with closed server", func(t *testing.T) {
svr := newTestServer(t, nil)
closeTestServer(t, svr)
resp, err := svr.GetSegmentInfo(context.Background(), &datapb.GetSegmentInfoRequest{
SegmentIDs: []int64{},
})
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason())
})
}
func TestGetComponentStates(t *testing.T) {
svr := &Server{}
type testCase struct {
state ServerState
code internalpb.StateCode
}
resp, err := svr.GetSegmentInfo(svr.ctx, req)
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
cases := []testCase{
{state: ServerStateStopped, code: internalpb.StateCode_Abnormal},
{state: ServerStateInitializing, code: internalpb.StateCode_Initializing},
{state: ServerStateHealthy, code: internalpb.StateCode_Healthy},
}
for _, tc := range cases {
atomic.StoreInt64(&svr.isServing, tc.state)
resp, err := svr.GetComponentStates(context.Background())
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
assert.Equal(t, tc.code, resp.GetState().GetStateCode())
}
}
func TestGetFlushedSegments(t *testing.T) {
t.Run("normal case", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
type testCase struct {
collID int64
partID int64
searchPartID int64
flushedSegments []int64
unflushedSegments []int64
expected []int64
}
cases := []testCase{
{
collID: 1,
partID: 1,
searchPartID: 1,
flushedSegments: []int64{1, 2, 3},
unflushedSegments: []int64{4},
expected: []int64{1, 2, 3},
},
{
collID: 1,
partID: 2,
searchPartID: 2,
flushedSegments: []int64{5, 6},
unflushedSegments: []int64{},
expected: []int64{5, 6},
},
{
collID: 2,
partID: 3,
searchPartID: 3,
flushedSegments: []int64{11, 12},
unflushedSegments: []int64{},
expected: []int64{11, 12},
},
{
collID: 1,
searchPartID: -1,
expected: []int64{1, 2, 3, 5, 6},
},
{
collID: 2,
searchPartID: -1,
expected: []int64{11, 12},
},
}
for _, tc := range cases {
for _, fs := range tc.flushedSegments {
segInfo := &datapb.SegmentInfo{
ID: fs,
CollectionID: tc.collID,
PartitionID: tc.partID,
State: commonpb.SegmentState_Flushed,
}
assert.Nil(t, svr.meta.AddSegment(NewSegmentInfo(segInfo)))
}
for _, us := range tc.unflushedSegments {
segInfo := &datapb.SegmentInfo{
ID: us,
CollectionID: tc.collID,
PartitionID: tc.partID,
State: commonpb.SegmentState_Growing,
}
assert.Nil(t, svr.meta.AddSegment(NewSegmentInfo(segInfo)))
}
resp, err := svr.GetFlushedSegments(context.Background(), &datapb.GetFlushedSegmentsRequest{
CollectionID: tc.collID,
PartitionID: tc.searchPartID,
})
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
assert.ElementsMatch(t, tc.expected, resp.GetSegments())
}
})
t.Run("with closed server", func(t *testing.T) {
t.Run("with closed server", func(t *testing.T) {
svr := newTestServer(t, nil)
closeTestServer(t, svr)
resp, err := svr.GetFlushedSegments(context.Background(), &datapb.GetFlushedSegmentsRequest{})
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason())
})
})
}
func TestServer_GetMetrics(t *testing.T) {
@ -522,6 +793,15 @@ func TestSaveBinlogPaths(t *testing.T) {
assert.EqualValues(t, segmentInfo.DmlPosition.MsgID, []byte{1, 2, 3})
assert.EqualValues(t, segmentInfo.NumOfRows, 10)
})
t.Run("with closed server", func(t *testing.T) {
svr := newTestServer(t, nil)
closeTestServer(t, svr)
resp, err := svr.SaveBinlogPaths(context.Background(), &datapb.SaveBinlogPathsRequest{})
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetErrorCode())
assert.Equal(t, serverNotServingErrMsg, resp.GetReason())
})
}
func TestDataNodeTtChannel(t *testing.T) {
@ -956,6 +1236,15 @@ func TestGetRecoveryInfo(t *testing.T) {
assert.EqualValues(t, 1, resp.GetBinlogs()[0].GetFieldBinlogs()[0].GetFieldID())
assert.ElementsMatch(t, []string{"/binlog/file1", "/binlog/file2"}, resp.GetBinlogs()[0].GetFieldBinlogs()[0].GetBinlogs())
})
t.Run("with closed server", func(t *testing.T) {
svr := newTestServer(t, nil)
closeTestServer(t, svr)
resp, err := svr.GetRecoveryInfo(context.TODO(), &datapb.GetRecoveryInfoRequest{})
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason())
})
}
func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Server {

View File

@ -17,10 +17,12 @@ import (
const serverNotServingErrMsg = "server is not serving"
// checks whether server in Healthy State
func (s *Server) isClosed() bool {
return atomic.LoadInt64(&s.isServing) != ServerStateHealthy
}
// GetTimeTickChannel legacy API, returns time tick channel name
func (s *Server) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
@ -30,6 +32,7 @@ func (s *Server) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringRespon
}, nil
}
// GetStatisticsChannel legacy API, returns statistics channel name
func (s *Server) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
@ -39,6 +42,9 @@ func (s *Server) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResp
}, nil
}
// Flush notify segment to flush
// this api only guarantees all the segments requested is sealed
// these segments will be flushed only after the Flush policy is fulfilled
func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.FlushResponse, error) {
log.Debug("receive flush request", zap.Int64("dbID", req.GetDbID()), zap.Int64("collectionID", req.GetCollectionID()))
resp := &datapb.FlushResponse{
@ -67,6 +73,7 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F
return resp, nil
}
// AssignSegmentID applies for segment ids and make allocation for records
func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) {
if s.isClosed() {
return &datapb.AssignSegmentIDResponse{
@ -130,6 +137,7 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
}, nil
}
// GetSegmentStates returns segments state
func (s *Server) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
resp := &datapb.GetSegmentStatesResponse{
Status: &commonpb.Status{
@ -162,6 +170,7 @@ func (s *Server) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentSta
return resp, nil
}
// GetInsertBinlogPaths returns binlog paths info for requested segments
func (s *Server) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsertBinlogPathsRequest) (*datapb.GetInsertBinlogPathsResponse, error) {
resp := &datapb.GetInsertBinlogPathsResponse{
Status: &commonpb.Status{
@ -190,6 +199,8 @@ func (s *Server) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsert
return resp, nil
}
// GetCollectionStatistics returns statistics for collection
// for now only row count is returned
func (s *Server) GetCollectionStatistics(ctx context.Context, req *datapb.GetCollectionStatisticsRequest) (*datapb.GetCollectionStatisticsResponse, error) {
resp := &datapb.GetCollectionStatisticsResponse{
Status: &commonpb.Status{
@ -206,6 +217,8 @@ func (s *Server) GetCollectionStatistics(ctx context.Context, req *datapb.GetCol
return resp, nil
}
// GetPartitionStatistics return statistics for parition
// for now only row count is returned
func (s *Server) GetPartitionStatistics(ctx context.Context, req *datapb.GetPartitionStatisticsRequest) (*datapb.GetPartitionStatisticsResponse, error) {
resp := &datapb.GetPartitionStatisticsResponse{
Status: &commonpb.Status{
@ -222,6 +235,7 @@ func (s *Server) GetPartitionStatistics(ctx context.Context, req *datapb.GetPart
return resp, nil
}
// GetSegmentInfoChannel legacy API, returns segment info statistics channel
func (s *Server) GetSegmentInfoChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
@ -231,6 +245,7 @@ func (s *Server) GetSegmentInfoChannel(ctx context.Context) (*milvuspb.StringRes
}, nil
}
// GetSegmentInfo returns segment info requested, status, row count, etc included
func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
resp := &datapb.GetSegmentInfoResponse{
Status: &commonpb.Status{
@ -255,6 +270,8 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR
return resp, nil
}
// SaveBinlogPaths update segment related binlog path
// works for Checkpoints and Flush
func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) {
resp := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
@ -314,6 +331,7 @@ func (s *Server) GetComponentStates(ctx context.Context) (*internalpb.ComponentS
return resp, nil
}
// GetRecoveryInfo get recovery info for segment
func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInfoRequest) (*datapb.GetRecoveryInfoResponse, error) {
collectionID := req.GetCollectionID()
partitionID := req.GetPartitionID()
@ -407,9 +425,23 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf
return resp, nil
}
// GetFlushedSegments returns all segment matches provided criterion and in State Flushed
// If requested partition id < 0, ignores the partition id filter
func (s *Server) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error) {
resp := &datapb.GetFlushedSegmentsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}
collectionID := req.GetCollectionID()
partitionID := req.GetPartitionID()
log.Debug("GetFlushedSegment",
zap.Int64("collectionID", collectionID),
zap.Int64("partitionID", partitionID))
if s.isClosed() {
resp.Status.Reason = serverNotServingErrMsg
return resp, nil
}
var segmentIDs []UniqueID
if partitionID < 0 {
segmentIDs = s.meta.GetSegmentsOfCollection(collectionID)
@ -424,14 +456,13 @@ func (s *Server) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedS
}
ret = append(ret, id)
}
return &datapb.GetFlushedSegmentsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Segments: ret,
}, nil
resp.Segments = ret
resp.Status.ErrorCode = commonpb.ErrorCode_Success
return resp, nil
}
// GetMetrics returns DataCoord metrics info
// it may include SystemMetrics, Topology metrics, etc.
func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
log.Debug("DataCoord.GetMetrics",
zap.Int64("node_id", Params.NodeID),