Replace manual composed grpc call with Broker methods (#27676)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/27640/head
congqixia 2023-10-13 09:55:34 +08:00 committed by GitHub
parent bf46ffd6c4
commit 82b2edc4bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 822 additions and 606 deletions

View File

@ -306,8 +306,7 @@ func (_c *MockAllocator_Start_Call) RunAndReturn(run func() error) *MockAllocato
func NewMockAllocator(t interface {
mock.TestingT
Cleanup(func())
},
) *MockAllocator {
}) *MockAllocator {
mock := &MockAllocator{}
mock.Mock.Test(t)

View File

@ -49,4 +49,6 @@ type DataCoord interface {
UpdateChannelCheckpoint(ctx context.Context, channelName string, cp *msgpb.MsgPosition) error
SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) error
DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) error
UpdateSegmentStatistics(ctx context.Context, req *datapb.UpdateSegmentStatisticsRequest) error
SaveImportSegment(ctx context.Context, req *datapb.SaveImportSegmentRequest) error
}

View File

@ -131,3 +131,27 @@ func (dc *dataCoordBroker) DropVirtualChannel(ctx context.Context, req *datapb.D
return nil
}
func (dc *dataCoordBroker) UpdateSegmentStatistics(ctx context.Context, req *datapb.UpdateSegmentStatisticsRequest) error {
log := log.Ctx(ctx)
resp, err := dc.client.UpdateSegmentStatistics(ctx, req)
if err := merr.CheckRPCCall(resp, err); err != nil {
log.Warn("failed to UpdateSegmentStatistics", zap.Error(err))
return err
}
return nil
}
func (dc *dataCoordBroker) SaveImportSegment(ctx context.Context, req *datapb.SaveImportSegmentRequest) error {
log := log.Ctx(ctx)
resp, err := dc.client.SaveImportSegment(ctx, req)
if err := merr.CheckRPCCall(resp, err); err != nil {
log.Warn("failed to UpdateSegmentStatistics", zap.Error(err))
return err
}
return nil
}

View File

@ -291,6 +291,85 @@ func (s *dataCoordSuite) TestDropVirtualChannel() {
})
}
func (s *dataCoordSuite) TestUpdateSegmentStatistics() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
req := &datapb.UpdateSegmentStatisticsRequest{
Stats: []*commonpb.SegmentStats{
{}, {}, {},
},
}
s.Run("normal_case", func() {
s.dc.EXPECT().UpdateSegmentStatistics(mock.Anything, mock.Anything).
Run(func(_ context.Context, r *datapb.UpdateSegmentStatisticsRequest, _ ...grpc.CallOption) {
s.Equal(len(req.GetStats()), len(r.GetStats()))
}).
Return(merr.Status(nil), nil)
err := s.broker.UpdateSegmentStatistics(ctx, req)
s.NoError(err)
s.resetMock()
})
s.Run("datacoord_return_failure_status", func() {
s.dc.EXPECT().UpdateSegmentStatistics(mock.Anything, mock.Anything).
Return(nil, errors.New("mock"))
err := s.broker.UpdateSegmentStatistics(ctx, req)
s.Error(err)
s.resetMock()
})
s.Run("datacoord_return_failure_status", func() {
s.dc.EXPECT().UpdateSegmentStatistics(mock.Anything, mock.Anything).
Return(merr.Status(errors.New("mock")), nil)
err := s.broker.UpdateSegmentStatistics(ctx, req)
s.Error(err)
s.resetMock()
})
}
func (s *dataCoordSuite) TestSaveImportSegment() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
segmentID := int64(1001)
collectionID := int64(100)
req := &datapb.SaveImportSegmentRequest{
SegmentId: segmentID,
CollectionId: collectionID,
}
s.Run("normal_case", func() {
s.dc.EXPECT().SaveImportSegment(mock.Anything, mock.Anything).
Run(func(_ context.Context, r *datapb.SaveImportSegmentRequest, _ ...grpc.CallOption) {
s.Equal(collectionID, req.GetCollectionId())
s.Equal(segmentID, req.GetSegmentId())
}).
Return(merr.Status(nil), nil)
err := s.broker.SaveImportSegment(ctx, req)
s.NoError(err)
s.resetMock()
})
s.Run("datacoord_return_failure_status", func() {
s.dc.EXPECT().SaveImportSegment(mock.Anything, mock.Anything).
Return(nil, errors.New("mock"))
err := s.broker.SaveImportSegment(ctx, req)
s.Error(err)
s.resetMock()
})
s.Run("datacoord_return_failure_status", func() {
s.dc.EXPECT().SaveImportSegment(mock.Anything, mock.Anything).
Return(merr.Status(errors.New("mock")), nil)
err := s.broker.SaveImportSegment(ctx, req)
s.Error(err)
s.resetMock()
})
}
func TestDataCoordBroker(t *testing.T) {
suite.Run(t, new(dataCoordSuite))
}

View File

@ -440,6 +440,49 @@ func (_c *MockBroker_SaveBinlogPaths_Call) RunAndReturn(run func(context.Context
return _c
}
// SaveImportSegment provides a mock function with given fields: ctx, req
func (_m *MockBroker) SaveImportSegment(ctx context.Context, req *datapb.SaveImportSegmentRequest) error {
ret := _m.Called(ctx, req)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *datapb.SaveImportSegmentRequest) error); ok {
r0 = rf(ctx, req)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockBroker_SaveImportSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SaveImportSegment'
type MockBroker_SaveImportSegment_Call struct {
*mock.Call
}
// SaveImportSegment is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.SaveImportSegmentRequest
func (_e *MockBroker_Expecter) SaveImportSegment(ctx interface{}, req interface{}) *MockBroker_SaveImportSegment_Call {
return &MockBroker_SaveImportSegment_Call{Call: _e.mock.On("SaveImportSegment", ctx, req)}
}
func (_c *MockBroker_SaveImportSegment_Call) Run(run func(ctx context.Context, req *datapb.SaveImportSegmentRequest)) *MockBroker_SaveImportSegment_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*datapb.SaveImportSegmentRequest))
})
return _c
}
func (_c *MockBroker_SaveImportSegment_Call) Return(_a0 error) *MockBroker_SaveImportSegment_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockBroker_SaveImportSegment_Call) RunAndReturn(run func(context.Context, *datapb.SaveImportSegmentRequest) error) *MockBroker_SaveImportSegment_Call {
_c.Call.Return(run)
return _c
}
// ShowPartitions provides a mock function with given fields: ctx, dbName, collectionName
func (_m *MockBroker) ShowPartitions(ctx context.Context, dbName string, collectionName string) (map[string]int64, error) {
ret := _m.Called(ctx, dbName, collectionName)
@ -540,6 +583,49 @@ func (_c *MockBroker_UpdateChannelCheckpoint_Call) RunAndReturn(run func(context
return _c
}
// UpdateSegmentStatistics provides a mock function with given fields: ctx, req
func (_m *MockBroker) UpdateSegmentStatistics(ctx context.Context, req *datapb.UpdateSegmentStatisticsRequest) error {
ret := _m.Called(ctx, req)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *datapb.UpdateSegmentStatisticsRequest) error); ok {
r0 = rf(ctx, req)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockBroker_UpdateSegmentStatistics_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateSegmentStatistics'
type MockBroker_UpdateSegmentStatistics_Call struct {
*mock.Call
}
// UpdateSegmentStatistics is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.UpdateSegmentStatisticsRequest
func (_e *MockBroker_Expecter) UpdateSegmentStatistics(ctx interface{}, req interface{}) *MockBroker_UpdateSegmentStatistics_Call {
return &MockBroker_UpdateSegmentStatistics_Call{Call: _e.mock.On("UpdateSegmentStatistics", ctx, req)}
}
func (_c *MockBroker_UpdateSegmentStatistics_Call) Run(run func(ctx context.Context, req *datapb.UpdateSegmentStatisticsRequest)) *MockBroker_UpdateSegmentStatistics_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*datapb.UpdateSegmentStatisticsRequest))
})
return _c
}
func (_c *MockBroker_UpdateSegmentStatistics_Call) Return(_a0 error) *MockBroker_UpdateSegmentStatistics_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockBroker_UpdateSegmentStatistics_Call) RunAndReturn(run func(context.Context, *datapb.UpdateSegmentStatisticsRequest) error) *MockBroker_UpdateSegmentStatistics_Call {
_c.Call.Return(run)
return _c
}
// NewMockBroker creates a new instance of MockBroker. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockBroker(t interface {

View File

@ -32,9 +32,9 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/conc"
@ -149,8 +149,8 @@ type addSegmentReq struct {
var _ Channel = &ChannelMeta{}
func newChannel(channelName string, collID UniqueID, schema *schemapb.CollectionSchema, rc types.RootCoordClient, cm storage.ChunkManager) *ChannelMeta {
metaService := newMetaService(rc, collID)
func newChannel(channelName string, collID UniqueID, schema *schemapb.CollectionSchema, broker broker.Broker, cm storage.ChunkManager) *ChannelMeta {
metaService := newMetaService(broker, collID)
channel := ChannelMeta{
collectionID: collID,

View File

@ -33,22 +33,25 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
)
var channelMetaNodeTestDir = "/tmp/milvus_test/channel_meta"
func TestNewChannel(t *testing.T) {
rc := &RootCoordFactory{}
broker := broker.NewMockBroker(t)
cm := storage.NewLocalChunkManager(storage.RootPath(channelMetaNodeTestDir))
defer cm.RemoveWithPrefix(context.Background(), cm.RootPath())
channel := newChannel("channel", 0, nil, rc, cm)
channel := newChannel("channel", 0, nil, broker, cm)
assert.NotNil(t, channel)
}
@ -110,17 +113,22 @@ func getSimpleFieldBinlog() *datapb.FieldBinlog {
func TestChannelMeta_InnerFunction(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rc := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
}
var (
broker = broker.NewMockBroker(t)
collID = UniqueID(1)
cm = storage.NewLocalChunkManager(storage.RootPath(channelMetaNodeTestDir))
channel = newChannel("insert-01", collID, nil, rc, cm)
channel = newChannel("insert-01", collID, nil, broker, cm)
)
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
meta := NewMetaFactory().GetCollectionMeta(collID, "test_collection", schemapb.DataType_Int64)
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(nil),
Schema: meta.GetSchema(),
}, nil)
require.False(t, channel.hasSegment(0, true))
require.False(t, channel.hasSegment(0, false))
@ -218,15 +226,13 @@ func TestChannelMeta_getCollectionAndPartitionID(t *testing.T) {
func TestChannelMeta_segmentFlushed(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rc := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
}
broker := broker.NewMockBroker(t)
collID := UniqueID(1)
cm := storage.NewLocalChunkManager(storage.RootPath(channelMetaNodeTestDir))
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
t.Run("Test coll mot match", func(t *testing.T) {
channel := newChannel("channel", collID, nil, rc, cm)
channel := newChannel("channel", collID, nil, broker, cm)
err := channel.addSegment(
context.TODO(),
addSegmentReq{
@ -287,9 +293,8 @@ func TestChannelMeta_segmentFlushed(t *testing.T) {
func TestChannelMeta_InterfaceMethod(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rc := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
}
broker := broker.NewMockBroker(t)
f := MetaFactory{}
cm := storage.NewLocalChunkManager(storage.RootPath(channelMetaNodeTestDir))
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
@ -310,7 +315,7 @@ func TestChannelMeta_InterfaceMethod(t *testing.T) {
}
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
channel := newChannel("a", test.channelCollID, nil, rc, cm)
channel := newChannel("a", test.channelCollID, nil, broker, cm)
if test.isvalid {
channel.addFlushedSegmentWithPKs(100, test.incollID, 10, 1, primaryKeyData)
@ -345,7 +350,14 @@ func TestChannelMeta_InterfaceMethod(t *testing.T) {
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
channel := newChannel("a", test.channelCollID, nil, rc, cm)
broker.ExpectedCalls = nil
meta := NewMetaFactory().GetCollectionMeta(test.channelCollID, "test_collection", schemapb.DataType_Int64)
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(nil),
Schema: meta.GetSchema(),
}, nil)
channel := newChannel("a", test.channelCollID, nil, broker, cm)
require.False(t, channel.hasSegment(test.inSegID, true))
err := channel.addSegment(
context.TODO(),
@ -389,7 +401,7 @@ func TestChannelMeta_InterfaceMethod(t *testing.T) {
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
channel := newChannel("a", test.channelCollID, nil, rc, &mockDataCM{})
channel := newChannel("a", test.channelCollID, nil, broker, &mockDataCM{})
require.False(t, channel.hasSegment(test.inSegID, true))
err := channel.addSegment(
context.TODO(),
@ -418,7 +430,7 @@ func TestChannelMeta_InterfaceMethod(t *testing.T) {
})
t.Run("Test_addNormalSegmentWithNilDml", func(t *testing.T) {
channel := newChannel("a", 1, nil, rc, &mockDataCM{})
channel := newChannel("a", 1, nil, broker, &mockDataCM{})
segID := int64(101)
require.False(t, channel.hasSegment(segID, true))
assert.NotPanics(t, func() {
@ -453,13 +465,23 @@ func TestChannelMeta_InterfaceMethod(t *testing.T) {
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
channel := newChannel("a", test.channelCollID, nil, rc, cm)
channel := newChannel("a", test.channelCollID, nil, broker, cm)
if test.metaServiceErr {
channel.collSchema = nil
rc.setCollectionID(-1)
broker.ExpectedCalls = nil
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(nil, errors.New("mock"))
} else {
rc.setCollectionID(1)
meta := f.GetCollectionMeta(test.channelCollID, "test_collection", schemapb.DataType_Int64)
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(nil),
CollectionID: test.channelCollID,
CollectionName: "test_collection",
ShardsNum: common.DefaultShardsNum,
Schema: meta.GetSchema(),
}, nil)
}
s, err := channel.getCollectionSchema(test.inputCollID, Timestamp(0))
@ -472,7 +494,7 @@ func TestChannelMeta_InterfaceMethod(t *testing.T) {
}
})
}
rc.setCollectionID(1)
broker.ExpectedCalls = nil
})
t.Run("Test listAllSegmentIDs", func(t *testing.T) {
@ -525,7 +547,14 @@ func TestChannelMeta_InterfaceMethod(t *testing.T) {
})
t.Run("Test_addSegmentMinIOLoadError", func(t *testing.T) {
channel := newChannel("a", 1, nil, rc, cm)
broker.ExpectedCalls = nil
meta := NewMetaFactory().GetCollectionMeta(1, "test_collection", schemapb.DataType_Int64)
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(nil),
Schema: meta.GetSchema(),
}, nil)
channel := newChannel("a", 1, nil, broker, cm)
channel.chunkManager = &mockDataCMError{}
err := channel.addSegment(
@ -555,7 +584,7 @@ func TestChannelMeta_InterfaceMethod(t *testing.T) {
})
t.Run("Test_addSegmentStatsError", func(t *testing.T) {
channel := newChannel("insert-01", 1, nil, rc, cm)
channel := newChannel("insert-01", 1, nil, broker, cm)
channel.chunkManager = &mockDataCMStatsError{}
var err error
@ -586,7 +615,7 @@ func TestChannelMeta_InterfaceMethod(t *testing.T) {
})
t.Run("Test_addSegmentPkfilterError", func(t *testing.T) {
channel := newChannel("insert-01", 1, nil, rc, cm)
channel := newChannel("insert-01", 1, nil, broker, cm)
channel.chunkManager = &mockPkfilterMergeError{}
var err error
@ -617,7 +646,7 @@ func TestChannelMeta_InterfaceMethod(t *testing.T) {
})
t.Run("Test_mergeFlushedSegments", func(t *testing.T) {
channel := newChannel("channel", 1, nil, rc, cm)
channel := newChannel("channel", 1, nil, broker, cm)
primaryKeyData := &storage.Int64FieldData{
Data: []UniqueID{1},
@ -711,9 +740,7 @@ func TestChannelMeta_InterfaceMethod(t *testing.T) {
func TestChannelMeta_loadStats(t *testing.T) {
f := &MetaFactory{}
rc := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
}
broker := broker.NewMockBroker(t)
t.Run("list with merged stats log", func(t *testing.T) {
meta := f.GetCollectionMeta(UniqueID(10001), "test_load_stats", schemapb.DataType_Int64)
@ -735,7 +762,7 @@ func TestChannelMeta_loadStats(t *testing.T) {
cm := &mockCm{}
channel := newChannel("channel", 1, meta.Schema, rc, cm)
channel := newChannel("channel", 1, meta.Schema, broker, cm)
channel.segments[seg1.segmentID] = seg1
channel.segments[seg2.segmentID] = seg2
@ -776,18 +803,24 @@ func TestChannelMeta_loadStats(t *testing.T) {
func TestChannelMeta_UpdatePKRange(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rc := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
}
broker := broker.NewMockBroker(t)
collID := UniqueID(1)
partID := UniqueID(2)
chanName := "insert-02"
startPos := &msgpb.MsgPosition{ChannelName: chanName, Timestamp: Timestamp(100)}
endPos := &msgpb.MsgPosition{ChannelName: chanName, Timestamp: Timestamp(200)}
meta := NewMetaFactory().GetCollectionMeta(collID, "test_collection", schemapb.DataType_Int64)
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(nil),
Schema: meta.GetSchema(),
}, nil)
cm := storage.NewLocalChunkManager(storage.RootPath(channelMetaNodeTestDir))
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
channel := newChannel("chanName", collID, nil, rc, cm)
channel := newChannel("chanName", collID, nil, broker, cm)
channel.chunkManager = &mockDataCM{}
err := channel.addSegment(
@ -836,9 +869,8 @@ func TestChannelMeta_UpdatePKRange(t *testing.T) {
func TestChannelMeta_ChannelCP(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rc := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
}
broker := broker.NewMockBroker(t)
mockVChannel := "fake-by-dev-rootcoord-dml-1-testchannelcp-v0"
mockPChannel := "fake-by-dev-rootcoord-dml-1"
@ -849,13 +881,19 @@ func TestChannelMeta_ChannelCP(t *testing.T) {
err := cm.RemoveWithPrefix(ctx, cm.RootPath())
assert.NoError(t, err)
}()
meta := NewMetaFactory().GetCollectionMeta(collID, "test_collection", schemapb.DataType_Int64)
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(nil),
Schema: meta.GetSchema(),
}, nil)
t.Run("get and set", func(t *testing.T) {
pos := &msgpb.MsgPosition{
ChannelName: mockPChannel,
Timestamp: 1000,
}
channel := newChannel(mockVChannel, collID, nil, rc, cm)
channel := newChannel(mockVChannel, collID, nil, broker, cm)
channel.chunkManager = &mockDataCM{}
position := channel.getChannelCheckpoint(pos)
assert.NotNil(t, position)
@ -869,7 +907,7 @@ func TestChannelMeta_ChannelCP(t *testing.T) {
ttPos, expectedPos *msgpb.MsgPosition,
) {
segmentID := UniqueID(1)
channel := newChannel(mockVChannel, collID, nil, rc, cm)
channel := newChannel(mockVChannel, collID, nil, broker, cm)
channel.chunkManager = &mockDataCM{}
err := channel.addSegment(
context.TODO(),
@ -952,12 +990,17 @@ type ChannelMetaSuite struct {
}
func (s *ChannelMetaSuite) SetupSuite() {
rc := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
}
broker := broker.NewMockBroker(s.T())
f := MetaFactory{}
meta := f.GetCollectionMeta(1, "testCollection", schemapb.DataType_Int64)
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(nil),
Schema: meta.GetSchema(),
}, nil).Maybe()
s.collID = 1
s.cm = storage.NewLocalChunkManager(storage.RootPath(channelMetaNodeTestDir))
s.channel = newChannel("channel", s.collID, nil, rc, s.cm)
s.channel = newChannel("channel", s.collID, nil, broker, s.cm)
s.vchanName = "channel"
}
@ -1075,13 +1118,18 @@ type ChannelMetaMockSuite struct {
}
func (s *ChannelMetaMockSuite) SetupTest() {
rc := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
}
broker := broker.NewMockBroker(s.T())
f := MetaFactory{}
meta := f.GetCollectionMeta(1, "testCollection", schemapb.DataType_Int64)
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(nil),
Schema: meta.GetSchema(),
}, nil).Maybe()
s.cm = mocks.NewChunkManager(s.T())
s.collID = 1
s.channel = newChannel("channel", s.collID, nil, rc, s.cm)
s.channel = newChannel("channel", s.collID, nil, broker, s.cm)
s.vchanName = "channel"
}

View File

@ -24,6 +24,7 @@ import (
"testing"
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
@ -34,12 +35,13 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/datanode/broker"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)
@ -52,10 +54,18 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
cm := storage.NewLocalChunkManager(storage.RootPath(compactTestDir))
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
t.Run("Test getSegmentMeta", func(t *testing.T) {
rc := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
}
channel := newChannel("a", 1, nil, rc, cm)
f := MetaFactory{}
meta := f.GetCollectionMeta(1, "testCollection", schemapb.DataType_Int64)
broker := broker.NewMockBroker(t)
broker.EXPECT().DescribeCollection(mock.Anything, int64(1), mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(nil),
CollectionID: 1,
CollectionName: "testCollection",
Schema: meta.GetSchema(),
ShardsNum: common.DefaultShardsNum,
}, nil)
channel := newChannel("a", 1, nil, broker, cm)
var err error
task := &compactionTask{
@ -84,7 +94,9 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
assert.Equal(t, UniqueID(10), partID)
assert.NotNil(t, meta)
rc.setCollectionID(-2)
broker.ExpectedCalls = nil
broker.EXPECT().DescribeCollection(mock.Anything, int64(1), mock.Anything).
Return(nil, errors.New("mock"))
task.Channel.(*ChannelMeta).collSchema = nil
_, _, _, err = task.getSegmentMeta(100)
assert.Error(t, err)
@ -274,12 +286,13 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
collectionID := int64(1)
meta := NewMetaFactory().GetCollectionMeta(collectionID, "test", schemapb.DataType_Int64)
rc := &mocks.MockRootCoordClient{}
rc.EXPECT().DescribeCollection(mock.Anything, mock.Anything).
broker := broker.NewMockBroker(t)
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Schema: meta.GetSchema(),
}, nil)
channel := newChannel("a", collectionID, meta.GetSchema(), rc, nil)
}, nil).Maybe()
channel := newChannel("a", collectionID, meta.GetSchema(), broker, nil)
channel.segments[1] = &Segment{numRows: 10}
alloc := allocator.NewMockAllocator(t)
@ -620,13 +633,11 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
})
t.Run("Test getNumRows error", func(t *testing.T) {
rc := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
}
cm := &mockCm{}
broker := broker.NewMockBroker(t)
ct := &compactionTask{
Channel: newChannel("channel", 1, nil, rc, cm),
Channel: newChannel("channel", 1, nil, broker, cm),
plan: &datapb.CompactionPlan{
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{
@ -801,20 +812,27 @@ func TestCompactorInterfaceMethods(t *testing.T) {
}
for _, c := range cases {
rc := &RootCoordFactory{
pkType: c.pkType,
}
collName := "test_compact_coll_name"
meta := NewMetaFactory().GetCollectionMeta(c.colID, collName, c.pkType)
broker := broker.NewMockBroker(t)
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(nil),
Schema: meta.GetSchema(),
CollectionID: c.colID,
CollectionName: collName,
ShardsNum: common.DefaultShardsNum,
}, nil)
mockfm := &mockFlushManager{}
mockKv := memkv.NewMemoryKV()
mockbIO := &binlogIO{cm, alloc}
channel := newChannel("a", c.colID, nil, rc, cm)
channel := newChannel("a", c.colID, nil, broker, cm)
channel.addFlushedSegmentWithPKs(c.segID1, c.colID, c.parID, 2, c.iData1)
channel.addFlushedSegmentWithPKs(c.segID2, c.colID, c.parID, 2, c.iData2)
require.True(t, channel.hasSegment(c.segID1, true))
require.True(t, channel.hasSegment(c.segID2, true))
meta := NewMetaFactory().GetCollectionMeta(c.colID, "test_compact_coll_name", c.pkType)
iData1 := genInsertDataWithPKs(c.pks1, c.pkType)
dData1 := &DeleteData{
Pks: []primaryKey{c.pks1[0]},
@ -909,23 +927,31 @@ func TestCompactorInterfaceMethods(t *testing.T) {
// The merged segment 19530 should only contain 2 rows and both pk=2
// Both pk = 1 rows of the two segments are compacted.
var collID, partID, segID1, segID2 UniqueID = 1, 10, 200, 201
var collName string = "test_compact_coll_name"
alloc := allocator.NewMockAllocator(t)
alloc.EXPECT().AllocOne().Call.Return(int64(19530), nil)
alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil)
rc := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
}
meta := NewMetaFactory().GetCollectionMeta(collID, "test_compact_coll_name", schemapb.DataType_Int64)
broker := broker.NewMockBroker(t)
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(nil),
Schema: meta.GetSchema(),
CollectionID: collID,
CollectionName: collName,
ShardsNum: common.DefaultShardsNum,
}, nil)
mockfm := &mockFlushManager{}
mockbIO := &binlogIO{cm, alloc}
channel := newChannel("channelname", collID, nil, rc, cm)
channel := newChannel("channelname", collID, nil, broker, cm)
channel.addFlushedSegmentWithPKs(segID1, collID, partID, 2, &storage.Int64FieldData{Data: []UniqueID{1}})
channel.addFlushedSegmentWithPKs(segID2, collID, partID, 2, &storage.Int64FieldData{Data: []UniqueID{1}})
require.True(t, channel.hasSegment(segID1, true))
require.True(t, channel.hasSegment(segID2, true))
meta := NewMetaFactory().GetCollectionMeta(collID, "test_compact_coll_name", schemapb.DataType_Int64)
// the same pk for segmentI and segmentII
pks := [2]primaryKey{newInt64PrimaryKey(1), newInt64PrimaryKey(2)}
iData1 := genInsertDataWithPKs(pks, schemapb.DataType_Int64)
@ -944,14 +970,14 @@ func TestCompactorInterfaceMethods(t *testing.T) {
RowCount: 0,
}
stats1 := storage.NewPrimaryKeyStats(1, int64(rc.pkType), 1)
stats1 := storage.NewPrimaryKeyStats(1, int64(schemapb.DataType_Int64), 1)
iPaths1, sPaths1, err := mockbIO.uploadStatsLog(context.TODO(), segID1, partID, iData1, stats1, 1, meta)
require.NoError(t, err)
dPaths1, err := mockbIO.uploadDeltaLog(context.TODO(), segID1, partID, dData1, meta)
require.NoError(t, err)
require.Equal(t, 12, len(iPaths1))
stats2 := storage.NewPrimaryKeyStats(1, int64(rc.pkType), 1)
stats2 := storage.NewPrimaryKeyStats(1, int64(schemapb.DataType_Int64), 1)
iPaths2, sPaths2, err := mockbIO.uploadStatsLog(context.TODO(), segID2, partID, iData2, stats2, 1, meta)
require.NoError(t, err)
dPaths2, err := mockbIO.uploadDeltaLog(context.TODO(), segID2, partID, dData2, meta)

View File

@ -36,9 +36,9 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
@ -46,7 +46,6 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgdispatcher"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/logutil"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@ -96,6 +95,7 @@ type DataNode struct {
address string
rootCoord types.RootCoordClient
dataCoord types.DataCoordClient
broker broker.Broker
// call once
initOnce sync.Once
@ -232,6 +232,8 @@ func (node *DataNode) Init() error {
return
}
node.broker = broker.NewCoordBroker(node.rootCoord, node.dataCoord)
err := node.initRateCollector()
if err != nil {
log.Error("DataNode server init rateCollector failed", zap.Int64("node ID", paramtable.GetNodeID()), zap.Error(err))
@ -312,26 +314,27 @@ func (node *DataNode) Start() error {
}
log.Info("start id allocator done", zap.String("role", typeutil.DataNodeRole))
rep, err := node.rootCoord.AllocTimestamp(node.ctx, &rootcoordpb.AllocTimestampRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_RequestTSO),
commonpbutil.WithMsgID(0),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
Count: 1,
})
if err != nil || rep.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
log.Warn("fail to alloc timestamp", zap.Any("rep", rep), zap.Error(err))
startErr = errors.New("DataNode fail to alloc timestamp")
return
}
/*
rep, err := node.rootCoord.AllocTimestamp(node.ctx, &rootcoordpb.AllocTimestampRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_RequestTSO),
commonpbutil.WithMsgID(0),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
Count: 1,
})
if err != nil || rep.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
log.Warn("fail to alloc timestamp", zap.Any("rep", rep), zap.Error(err))
startErr = errors.New("DataNode fail to alloc timestamp")
return
}*/
connectEtcdFn := func() error {
etcdKV := etcdkv.NewEtcdKV(node.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue())
node.watchKv = etcdKV
return nil
}
err = retry.Do(node.ctx, connectEtcdFn, retry.Attempts(ConnectEtcdMaxRetryTime))
err := retry.Do(node.ctx, connectEtcdFn, retry.Attempts(ConnectEtcdMaxRetryTime))
if err != nil {
startErr = errors.New("DataNode fail to connect etcd")
return
@ -351,7 +354,7 @@ func (node *DataNode) Start() error {
go node.compactionExecutor.start(node.ctx)
if Params.DataNodeCfg.DataNodeTimeTickByRPC.GetAsBool() {
node.timeTickSender = newTimeTickSender(node.dataCoord, node.session.ServerID)
node.timeTickSender = newTimeTickSender(node.broker, node.session.ServerID)
go node.timeTickSender.start(node.ctx)
}

View File

@ -26,10 +26,12 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
@ -102,6 +104,13 @@ func TestDataNode(t *testing.T) {
assert.Empty(t, node.GetAddress())
node.SetAddress("address")
assert.Equal(t, "address", node.GetAddress())
broker := &broker.MockBroker{}
broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything).Return([]*datapb.SegmentInfo{}, nil).Maybe()
node.broker = broker
defer node.Stop()
node.chunkManager = storage.NewLocalChunkManager(storage.RootPath("/tmp/milvus_test/datanode"))

View File

@ -23,19 +23,15 @@ import (
"github.com/cockroachdb/errors"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgdispatcher"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
)
@ -54,6 +50,7 @@ type dataSyncService struct {
fg *flowgraph.TimeTickedFlowGraph // internal flowgraph processes insert/delta messages
broker broker.Broker
delBufferManager *DeltaBufferManager
flushManager flushManager // flush manager handles flush process
@ -67,7 +64,6 @@ type dataSyncService struct {
idAllocator allocator.Allocator // id/timestamp allocator
msFactory msgstream.Factory
dispClient msgdispatcher.Client
dataCoord types.DataCoordClient // DataCoord instance to interact with
chunkManager storage.ChunkManager
// test only
@ -146,7 +142,7 @@ func getChannelWithTickler(initCtx context.Context, node *DataNode, info *datapb
)
// init channel meta
channel := newChannel(channelName, collectionID, info.GetSchema(), node.rootCoord, node.chunkManager)
channel := newChannel(channelName, collectionID, info.GetSchema(), node.broker, node.chunkManager)
// tickler will update addSegment progress to watchInfo
futures := make([]*conc.Future[any], 0, len(unflushed)+len(flushed))
@ -224,7 +220,7 @@ func getChannelWithEtcdTickler(initCtx context.Context, node *DataNode, info *da
)
// init channel meta
channel := newChannel(channelName, collectionID, info.GetSchema(), node.rootCoord, node.chunkManager)
channel := newChannel(channelName, collectionID, info.GetSchema(), node.broker, node.chunkManager)
// tickler will update addSegment progress to watchInfo
tickler.watch()
@ -333,7 +329,7 @@ func getServiceWithChannel(initCtx context.Context, node *DataNode, info *datapb
dispClient: node.dispClient,
msFactory: node.factory,
dataCoord: node.dataCoord,
broker: node.broker,
idAllocator: config.allocator,
channel: config.channel,
@ -399,7 +395,7 @@ func getServiceWithChannel(initCtx context.Context, node *DataNode, info *datapb
return nil, err
}
ttNode, err := newTTNode(config, node.dataCoord)
ttNode, err := newTTNode(config, node.broker)
if err != nil {
return nil, err
}
@ -417,11 +413,11 @@ func getServiceWithChannel(initCtx context.Context, node *DataNode, info *datapb
// newServiceWithEtcdTickler stops and returns the initCtx.Err()
func newServiceWithEtcdTickler(initCtx context.Context, node *DataNode, info *datapb.ChannelWatchInfo, tickler *etcdTickler) (*dataSyncService, error) {
// recover segment checkpoints
unflushedSegmentInfos, err := getSegmentInfos(initCtx, node.dataCoord, info.GetVchan().GetUnflushedSegmentIds())
unflushedSegmentInfos, err := node.broker.GetSegmentInfo(initCtx, info.GetVchan().GetUnflushedSegmentIds())
if err != nil {
return nil, err
}
flushedSegmentInfos, err := getSegmentInfos(initCtx, node.dataCoord, info.GetVchan().GetFlushedSegmentIds())
flushedSegmentInfos, err := node.broker.GetSegmentInfo(initCtx, info.GetVchan().GetFlushedSegmentIds())
if err != nil {
return nil, err
}
@ -441,11 +437,11 @@ func newServiceWithEtcdTickler(initCtx context.Context, node *DataNode, info *da
// NOTE: compactiable for event manager
func newDataSyncService(initCtx context.Context, node *DataNode, info *datapb.ChannelWatchInfo, tickler *tickler) (*dataSyncService, error) {
// recover segment checkpoints
unflushedSegmentInfos, err := getSegmentInfos(initCtx, node.dataCoord, info.GetVchan().GetUnflushedSegmentIds())
unflushedSegmentInfos, err := node.broker.GetSegmentInfo(initCtx, info.GetVchan().GetUnflushedSegmentIds())
if err != nil {
return nil, err
}
flushedSegmentInfos, err := getSegmentInfos(initCtx, node.dataCoord, info.GetVchan().GetFlushedSegmentIds())
flushedSegmentInfos, err := node.broker.GetSegmentInfo(initCtx, info.GetVchan().GetFlushedSegmentIds())
if err != nil {
return nil, err
}
@ -458,23 +454,3 @@ func newDataSyncService(initCtx context.Context, node *DataNode, info *datapb.Ch
return getServiceWithChannel(initCtx, node, info, channel, unflushedSegmentInfos, flushedSegmentInfos)
}
// getSegmentInfos return the SegmentInfo details according to the given ids through RPC to datacoord
// TODO: add a broker for the rpc
func getSegmentInfos(ctx context.Context, datacoord types.DataCoordClient, segmentIDs []int64) ([]*datapb.SegmentInfo, error) {
infoResp, err := datacoord.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_SegmentInfo),
commonpbutil.WithMsgID(0),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
SegmentIDs: segmentIDs,
IncludeUnHealthy: true,
})
if err := merr.CheckRPCCall(infoResp, err); err != nil {
log.Error("Fail to get SegmentInfo by ids from datacoord", zap.Error(err))
return nil, err
}
return infoResp.Infos, nil
}

View File

@ -27,21 +27,25 @@ import (
"testing"
"time"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
@ -191,6 +195,14 @@ func TestDataSyncService_Start(t *testing.T) {
node.chunkManager = storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir))
defer node.chunkManager.RemoveWithPrefix(ctx, node.chunkManager.RootPath())
broker := broker.NewMockBroker(t)
broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
node.broker = broker
alloc := allocator.NewMockAllocator(t)
alloc.EXPECT().Alloc(mock.Anything).Call.Return(int64(22222),
func(count uint32) int64 {
@ -242,21 +254,28 @@ func TestDataSyncService_Start(t *testing.T) {
},
}
node.dataCoord.(*DataCoordFactory).UserSegmentInfo = map[int64]*datapb.SegmentInfo{
0: {
ID: 0,
CollectionID: collMeta.ID,
PartitionID: 1,
InsertChannel: insertChannelName,
},
broker.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything).Call.Return(
func(_ context.Context, segmentIDs []int64) []*datapb.SegmentInfo {
data := map[int64]*datapb.SegmentInfo{
0: {
ID: 0,
CollectionID: collMeta.ID,
PartitionID: 1,
InsertChannel: insertChannelName,
},
1: {
ID: 1,
CollectionID: collMeta.ID,
PartitionID: 1,
InsertChannel: insertChannelName,
},
}
1: {
ID: 1,
CollectionID: collMeta.ID,
PartitionID: 1,
InsertChannel: insertChannelName,
},
}
return lo.FilterMap(segmentIDs, func(id int64, _ int) (*datapb.SegmentInfo, bool) {
item, ok := data[id]
return item, ok
})
}, nil)
sync, err := newServiceWithEtcdTickler(
ctx,
@ -354,6 +373,14 @@ func TestDataSyncService_Close(t *testing.T) {
node.chunkManager = storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir))
defer node.chunkManager.RemoveWithPrefix(ctx, node.chunkManager.RootPath())
broker := broker.NewMockBroker(t)
broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
node.broker = broker
ufs := []*datapb.SegmentInfo{{
CollectionID: collMeta.ID,
PartitionID: 1,
@ -389,28 +416,36 @@ func TestDataSyncService_Close(t *testing.T) {
}
alloc := allocator.NewMockAllocator(t)
alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil)
alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil).Maybe()
alloc.EXPECT().Alloc(mock.Anything).Call.Return(int64(22222),
func(count uint32) int64 {
return int64(22222 + count)
}, nil)
}, nil).Maybe()
node.allocator = alloc
node.dataCoord.(*DataCoordFactory).UserSegmentInfo = map[int64]*datapb.SegmentInfo{
0: {
ID: 0,
CollectionID: collMeta.ID,
PartitionID: 1,
InsertChannel: insertChannelName,
},
broker.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything).Call.Return(
func(_ context.Context, segmentIDs []int64) []*datapb.SegmentInfo {
data := map[int64]*datapb.SegmentInfo{
0: {
ID: 0,
CollectionID: collMeta.ID,
PartitionID: 1,
InsertChannel: insertChannelName,
},
1: {
ID: 1,
CollectionID: collMeta.ID,
PartitionID: 1,
InsertChannel: insertChannelName,
},
}
1: {
ID: 1,
CollectionID: collMeta.ID,
PartitionID: 1,
InsertChannel: insertChannelName,
},
}
segments := lo.FilterMap(segmentIDs, func(id int64, _ int) (*datapb.SegmentInfo, bool) {
item, ok := data[id]
return item, ok
})
return segments
}, nil).Maybe()
// No Auto flush
paramtable.Get().Reset(Params.DataNodeCfg.FlushInsertBufferSize.Key)
@ -421,7 +456,7 @@ func TestDataSyncService_Close(t *testing.T) {
watchInfo,
genTestTickler(),
)
assert.NoError(t, err)
require.NoError(t, err)
assert.NotNil(t, syncService)
syncService.channel.(*ChannelMeta).syncPolicies = []segmentSyncPolicy{
syncMemoryTooHigh(),
@ -600,53 +635,26 @@ func TestBytesReader(t *testing.T) {
assert.Equal(t, int8(100), dataInt8)
}
func TestGetSegmentInfos(t *testing.T) {
dataCoord := &DataCoordFactory{}
ctx := context.Background()
segmentInfos, err := getSegmentInfos(ctx, dataCoord, []int64{1})
assert.NoError(t, err)
assert.Equal(t, 1, len(segmentInfos))
dataCoord.GetSegmentInfosError = true
segmentInfos2, err := getSegmentInfos(ctx, dataCoord, []int64{1})
assert.Error(t, err)
assert.Empty(t, segmentInfos2)
dataCoord.GetSegmentInfosError = false
dataCoord.GetSegmentInfosNotSuccess = true
segmentInfos3, err := getSegmentInfos(ctx, dataCoord, []int64{1})
assert.Error(t, err)
assert.Empty(t, segmentInfos3)
dataCoord.GetSegmentInfosError = false
dataCoord.GetSegmentInfosNotSuccess = false
dataCoord.UserSegmentInfo = map[int64]*datapb.SegmentInfo{
5: {
ID: 100,
CollectionID: 101,
PartitionID: 102,
InsertChannel: "by-dev-rootcoord-dml-test_v1",
},
}
segmentInfos, err = getSegmentInfos(ctx, dataCoord, []int64{5})
assert.NoError(t, err)
assert.Equal(t, 1, len(segmentInfos))
assert.Equal(t, int64(100), segmentInfos[0].ID)
}
func TestClearGlobalFlushingCache(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dataCoord := &DataCoordFactory{}
meta := NewMetaFactory().GetCollectionMeta(1, "test_collection", schemapb.DataType_Int64)
broker := broker.NewMockBroker(t)
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(nil),
CollectionID: 1,
CollectionName: "test_collection",
Schema: meta.GetSchema(),
}, nil)
cm := storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir))
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
channel := newChannel("channel", 1, nil, &RootCoordFactory{pkType: schemapb.DataType_Int64}, cm)
channel := newChannel("channel", 1, nil, broker, cm)
var err error
cache := newCache()
dsService := &dataSyncService{
dataCoord: dataCoord,
broker: broker,
channel: channel,
flushingSegCache: cache,
}
@ -723,6 +731,17 @@ func TestGetChannelWithTickler(t *testing.T) {
node.chunkManager = storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir))
defer node.chunkManager.RemoveWithPrefix(context.Background(), node.chunkManager.RootPath())
meta := NewMetaFactory().GetCollectionMeta(1, "test_collection", schemapb.DataType_Int64)
broker := broker.NewMockBroker(t)
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(nil),
CollectionID: 1,
CollectionName: "test_collection",
Schema: meta.GetSchema(),
}, nil)
node.broker = broker
unflushed := []*datapb.SegmentInfo{
{
ID: 100,

View File

@ -28,8 +28,10 @@ import (
"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/broker"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/etcd"
@ -60,6 +62,17 @@ func TestWatchChannel(t *testing.T) {
defer cancel()
broker := broker.NewMockBroker(t)
broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything).Return([]*datapb.SegmentInfo{}, nil).Maybe()
broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
node.broker = broker
node.timeTickSender = newTimeTickSender(node.broker, 0)
t.Run("test watch channel", func(t *testing.T) {
kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath.GetValue())
oldInvalidCh := "datanode-etcd-test-by-dev-rootcoord-dml-channel-invalid"

View File

@ -38,6 +38,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/storage"
@ -45,6 +46,7 @@ import (
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
@ -80,13 +82,17 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) {
require.NoError(t, err)
Params.Save("etcd.rootPath", "/test/datanode/root")
Factory := &MetaFactory{}
collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64)
mockRootCoord := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
}
collMeta := NewMetaFactory().GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64)
broker := broker.NewMockBroker(t)
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(nil),
Schema: collMeta.GetSchema(),
}, nil).Maybe()
broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).
Return(nil).Maybe()
channel := newChannel(insertChannelName, collMeta.ID, collMeta.Schema, mockRootCoord, cm)
channel := newChannel(insertChannelName, collMeta.ID, collMeta.Schema, broker, cm)
err = channel.addSegment(
context.TODO(),
addSegmentReq{
@ -118,8 +124,7 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) {
delBufHeap: &PriorityQueue{},
}
dataCoord := &DataCoordFactory{}
atimeTickSender := newTimeTickSender(dataCoord, 0)
atimeTickSender := newTimeTickSender(broker, 0)
iBNode, err := newInsertBufferNode(ctx, flushChan, resendTTChan, delBufManager, fm, newCache(), atimeTickSender, c)
assert.NotNil(t, iBNode)
require.NoError(t, err)
@ -179,11 +184,16 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
Factory := &MetaFactory{}
collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64)
mockRootCoord := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
}
broker := broker.NewMockBroker(t)
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(nil),
Schema: collMeta.GetSchema(),
}, nil).Maybe()
broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).
Return(nil).Maybe()
channel := newChannel(insertChannelName, collMeta.ID, collMeta.Schema, mockRootCoord, cm)
channel := newChannel(insertChannelName, collMeta.ID, collMeta.Schema, broker, cm)
err = channel.addSegment(
context.TODO(),
@ -219,8 +229,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
delBufHeap: &PriorityQueue{},
}
dataCoord := &DataCoordFactory{}
atimeTickSender := newTimeTickSender(dataCoord, 0)
atimeTickSender := newTimeTickSender(broker, 0)
iBNode, err := newInsertBufferNode(ctx, flushChan, resendTTChan, delBufManager, fm, newCache(), atimeTickSender, c)
require.NoError(t, err)
@ -341,13 +350,17 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
require.NoError(t, err)
Params.Save("etcd.rootPath", "/test/datanode/root")
Factory := &MetaFactory{}
collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64)
dataFactory := NewDataFactory()
collMeta := NewMetaFactory().GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64)
broker := broker.NewMockBroker(t)
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(nil),
Schema: collMeta.GetSchema(),
}, nil).Maybe()
broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).
Return(nil).Maybe()
mockRootCoord := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
}
dataFactory := NewDataFactory()
channel := &ChannelMeta{
collectionID: collMeta.ID,
@ -355,7 +368,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
isHighMemory: atomic.NewBool(false),
}
channel.metaService = newMetaService(mockRootCoord, collMeta.ID)
channel.metaService = newMetaService(broker, collMeta.ID)
factory := dependency.NewDefaultFactory(true)
@ -397,8 +410,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
channel: channel,
delBufHeap: &PriorityQueue{},
}
dataCoord := &DataCoordFactory{}
atimeTickSender := newTimeTickSender(dataCoord, 0)
atimeTickSender := newTimeTickSender(broker, 0)
iBNode, err := newInsertBufferNode(ctx, flushChan, resendTTChan, delBufManager, fm, newCache(), atimeTickSender, c)
require.NoError(t, err)
@ -587,13 +599,17 @@ func TestInsertBufferNodeRollBF(t *testing.T) {
require.NoError(t, err)
Params.Save("etcd.rootPath", "/test/datanode/root")
Factory := &MetaFactory{}
collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64)
dataFactory := NewDataFactory()
collMeta := NewMetaFactory().GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64)
broker := broker.NewMockBroker(t)
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(nil),
Schema: collMeta.GetSchema(),
}, nil).Maybe()
broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).
Return(nil).Maybe()
mockRootCoord := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
}
dataFactory := NewDataFactory()
channel := &ChannelMeta{
collectionID: collMeta.ID,
@ -601,7 +617,7 @@ func TestInsertBufferNodeRollBF(t *testing.T) {
isHighMemory: atomic.NewBool(false),
}
channel.metaService = newMetaService(mockRootCoord, collMeta.ID)
channel.metaService = newMetaService(broker, collMeta.ID)
factory := dependency.NewDefaultFactory(true)
@ -644,8 +660,7 @@ func TestInsertBufferNodeRollBF(t *testing.T) {
delBufHeap: &PriorityQueue{},
}
dataCoord := &DataCoordFactory{}
atimeTickSender := newTimeTickSender(dataCoord, 0)
atimeTickSender := newTimeTickSender(broker, 0)
iBNode, err := newInsertBufferNode(ctx, flushChan, resendTTChan, delBufManager, fm, newCache(), atimeTickSender, c)
require.NoError(t, err)
@ -737,13 +752,18 @@ type InsertBufferNodeSuite struct {
func (s *InsertBufferNodeSuite) SetupSuite() {
insertBufferNodeTestDir := "/tmp/milvus_test/insert_buffer_node"
rc := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
}
collMeta := NewMetaFactory().GetCollectionMeta(1, "coll1", schemapb.DataType_Int64)
broker := broker.NewMockBroker(s.T())
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(nil),
Schema: collMeta.GetSchema(),
}, nil).Maybe()
s.collID = 1
s.partID = 10
s.channel = newChannel("channel", s.collID, nil, rc, s.cm)
s.channel = newChannel("channel", s.collID, nil, broker, s.cm)
s.delBufManager = &DeltaBufferManager{
channel: s.channel,
@ -971,7 +991,7 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) {
require.NoError(t, err)
Params.Save("etcd.rootPath", "/test/datanode/root")
Factory := &MetaFactory{}
factory := &MetaFactory{}
tests := []struct {
collID UniqueID
pkType schemapb.DataType
@ -984,16 +1004,18 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) {
cm := storage.NewLocalChunkManager(storage.RootPath(insertNodeTestDir))
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
for _, test := range tests {
collMeta := Factory.GetCollectionMeta(test.collID, "collection", test.pkType)
rcf := &RootCoordFactory{
pkType: test.pkType,
}
mockRootCoord := &CompactedRootCoord{
RootCoordClient: rcf,
compactTs: 100,
}
collMeta := factory.GetCollectionMeta(test.collID, "collection", test.pkType)
channel := newChannel(insertChannelName, collMeta.ID, collMeta.Schema, mockRootCoord, cm)
broker := broker.NewMockBroker(t)
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(nil),
Schema: collMeta.GetSchema(),
}, nil).Maybe()
broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).
Return(nil).Maybe()
channel := newChannel(insertChannelName, collMeta.ID, collMeta.Schema, broker, cm)
err = channel.addSegment(
context.TODO(),
addSegmentReq{
@ -1025,8 +1047,7 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) {
delBufHeap: &PriorityQueue{},
}
dataCoord := &DataCoordFactory{}
atimeTickSender := newTimeTickSender(dataCoord, 0)
atimeTickSender := newTimeTickSender(broker, 0)
iBNode, err := newInsertBufferNode(ctx, flushChan, resendTTChan, delBufManager, fm, newCache(), atimeTickSender, c)
require.NoError(t, err)
@ -1062,7 +1083,8 @@ func TestInsertBufferNode_updateSegmentStates(te *testing.T) {
}
for _, test := range invalideTests {
channel := newChannel("channel", test.channelCollID, nil, &RootCoordFactory{pkType: schemapb.DataType_Int64}, cm)
broker := broker.NewMockBroker(te)
channel := newChannel("channel", test.channelCollID, nil, broker, cm)
ibNode := &insertBufferNode{
channel: channel,
@ -1171,7 +1193,17 @@ func TestInsertBufferNode_task_pool_is_full(t *testing.T) {
collection := UniqueID(0)
segmentID := UniqueID(100)
channel := newChannel(channelName, collection, nil, &RootCoordFactory{pkType: schemapb.DataType_Int64}, cm)
meta := NewMetaFactory().GetCollectionMeta(collection, "test_collection", schemapb.DataType_Int64)
broker := broker.NewMockBroker(t)
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(nil),
CollectionID: 1,
CollectionName: "test_collection",
Schema: meta.GetSchema(),
}, nil)
channel := newChannel(channelName, collection, nil, broker, cm)
err := channel.addSegment(
context.TODO(),
addSegmentReq{

View File

@ -22,12 +22,16 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/merr"
)
func TestFlowGraphManager(t *testing.T) {
@ -51,6 +55,23 @@ func TestFlowGraphManager(t *testing.T) {
err = node.Init()
require.Nil(t, err)
meta := NewMetaFactory().GetCollectionMeta(1, "test_collection", schemapb.DataType_Int64)
broker := broker.NewMockBroker(t)
broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything).Return([]*datapb.SegmentInfo{}, nil).Maybe()
broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(nil),
CollectionID: 1,
CollectionName: "test_collection",
Schema: meta.GetSchema(),
}, nil)
node.broker = broker
fm := newFlowgraphManager()
defer func() {
fm.dropAll()

View File

@ -28,13 +28,9 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
@ -51,7 +47,7 @@ type ttNode struct {
vChannelName string
channel Channel
lastUpdateTime *atomic.Time
dataCoord types.DataCoordClient
broker broker.Broker
updateCPLock sync.Mutex
notifyChannel chan checkPoint
@ -124,16 +120,9 @@ func (ttn *ttNode) updateChannelCP(channelPos *msgpb.MsgPosition, curTs time.Tim
// TODO, change to ETCD operation, avoid datacoord operation
ctx, cancel := context.WithTimeout(context.Background(), updateChanCPTimeout)
defer cancel()
resp, err := ttn.dataCoord.UpdateChannelCheckpoint(ctx, &datapb.UpdateChannelCheckpointRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
VChannel: ttn.vChannelName,
Position: channelPos,
})
if err = merr.CheckRPCCall(resp, err); err != nil {
log.Warn("UpdateChannelCheckpoint failed", zap.String("channel", ttn.vChannelName),
zap.Time("channelCPTs", channelCPTs), zap.Error(err))
err := ttn.broker.UpdateChannelCheckpoint(ctx, ttn.vChannelName, channelPos)
if err != nil {
return err
}
@ -149,7 +138,7 @@ func (ttn *ttNode) updateChannelCP(channelPos *msgpb.MsgPosition, curTs time.Tim
return nil
}
func newTTNode(config *nodeConfig, dc types.DataCoordClient) (*ttNode, error) {
func newTTNode(config *nodeConfig, broker broker.Broker) (*ttNode, error) {
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(Params.DataNodeCfg.FlowGraphMaxQueueLength.GetAsInt32())
baseNode.SetMaxParallelism(Params.DataNodeCfg.FlowGraphMaxParallelism.GetAsInt32())
@ -159,7 +148,7 @@ func newTTNode(config *nodeConfig, dc types.DataCoordClient) (*ttNode, error) {
vChannelName: config.vChannelName,
channel: config.channel,
lastUpdateTime: atomic.NewTime(time.Time{}), // set to Zero to update channel checkpoint immediately after fg started
dataCoord: dc,
broker: broker,
notifyChannel: make(chan checkPoint, 1),
closeChannel: make(chan struct{}),
}

View File

@ -819,16 +819,15 @@ func dropVirtualChannelFunc(dsService *dataSyncService, opts ...retry.Option) fl
req.Segments = segments
err := retry.Do(context.Background(), func() error {
resp, err := dsService.dataCoord.DropVirtualChannel(context.Background(), req)
// should be network issue, return error and retry
err = merr.CheckRPCCall(resp, err)
if errors.Is(err, merr.ErrChannelNotFound) {
log.Warn("skip sync and start to drop virtual channel", zap.String("channel", dsService.vchannelName), zap.Error(err))
return nil
} else if err != nil {
err := dsService.broker.DropVirtualChannel(context.Background(), req)
if err != nil {
// meta error, datanode handles a virtual channel does not belong here
if errors.Is(err, merr.ErrChannelNotFound) {
log.Warn("meta error found, skip sync and start to drop virtual channel", zap.String("channel", dsService.vchannelName))
return nil
}
return err
}
dsService.channel.transferNewSegments(lo.Map(startPos, func(pos *datapb.SegmentStartPosition, _ int) UniqueID {
return pos.GetSegmentID()
}))
@ -910,14 +909,7 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet
Channel: dsService.vchannelName,
}
err := retry.Do(context.Background(), func() error {
rsp, err := dsService.dataCoord.SaveBinlogPaths(context.Background(), req)
// should be network issue, return error and retry
if err != nil {
return err
}
err = merr.Error(rsp)
err := dsService.broker.SaveBinlogPaths(context.Background(), req)
// Segment not found during stale segment flush. Segment might get compacted already.
// Stop retry and still proceed to the end, ignoring this error.
if !pack.flushed && errors.Is(err, merr.ErrSegmentNotFound) {

View File

@ -25,13 +25,15 @@ import (
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/storage"
@ -653,22 +655,28 @@ func TestRendezvousFlushManager_close(t *testing.T) {
}
func TestFlushNotifyFunc(t *testing.T) {
rcf := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
}
meta := NewMetaFactory().GetCollectionMeta(1, "testCollection", schemapb.DataType_Int64)
broker := broker.NewMockBroker(t)
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(nil),
Schema: meta.GetSchema(),
}, nil).Maybe()
broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil).Maybe()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir))
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
channel := newChannel("channel", 1, nil, rcf, cm)
channel := newChannel("channel", 1, nil, broker, cm)
dataCoord := &DataCoordFactory{}
flushingCache := newCache()
dsService := &dataSyncService{
collectionID: 1,
channel: channel,
dataCoord: dataCoord,
broker: broker,
flushingSegCache: flushingCache,
}
notifyFunc := flushNotifyFunc(dsService, retry.Attempts(1))
@ -693,14 +701,18 @@ func TestFlushNotifyFunc(t *testing.T) {
})
t.Run("datacoord save fails", func(t *testing.T) {
dataCoord.SaveBinlogPathStatus = merr.Status(merr.WrapErrCollectionNotFound("collection"))
broker.ExpectedCalls = nil
broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).
Return(merr.WrapErrCollectionNotFound("test_collection"))
assert.Panics(t, func() {
notifyFunc(&segmentFlushPack{})
})
})
t.Run("stale segment not found", func(t *testing.T) {
dataCoord.SaveBinlogPathStatus = merr.Status(merr.WrapErrSegmentNotFound(100))
broker.ExpectedCalls = nil
broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).
Return(merr.WrapErrSegmentNotFound(0))
assert.NotPanics(t, func() {
notifyFunc(&segmentFlushPack{flushed: false})
})
@ -709,14 +721,18 @@ func TestFlushNotifyFunc(t *testing.T) {
// issue https://github.com/milvus-io/milvus/issues/17097
// meta error, datanode shall not panic, just drop the virtual channel
t.Run("datacoord found meta error", func(t *testing.T) {
dataCoord.SaveBinlogPathStatus = merr.Status(merr.WrapErrChannelNotFound("channel"))
broker.ExpectedCalls = nil
broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).
Return(merr.WrapErrChannelNotFound("channel"))
assert.NotPanics(t, func() {
notifyFunc(&segmentFlushPack{})
})
})
t.Run("datacoord call error", func(t *testing.T) {
dataCoord.SaveBinlogPathError = true
broker.ExpectedCalls = nil
broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).
Return(errors.New("mock"))
assert.Panics(t, func() {
notifyFunc(&segmentFlushPack{})
})
@ -724,9 +740,15 @@ func TestFlushNotifyFunc(t *testing.T) {
}
func TestDropVirtualChannelFunc(t *testing.T) {
rcf := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
}
meta := NewMetaFactory().GetCollectionMeta(1, "testCollection", schemapb.DataType_Int64)
broker := broker.NewMockBroker(t)
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(nil),
Schema: meta.GetSchema(),
}, nil)
broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil).Maybe()
vchanName := "vchan_01"
ctx, cancel := context.WithCancel(context.Background())
@ -734,14 +756,13 @@ func TestDropVirtualChannelFunc(t *testing.T) {
cm := storage.NewLocalChunkManager(storage.RootPath(flushTestDir))
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
channel := newChannel(vchanName, 1, nil, rcf, cm)
channel := newChannel(vchanName, 1, nil, broker, cm)
dataCoord := &DataCoordFactory{}
flushingCache := newCache()
dsService := &dataSyncService{
collectionID: 1,
channel: channel,
dataCoord: dataCoord,
broker: broker,
flushingSegCache: flushingCache,
vchannelName: vchanName,
}
@ -787,16 +808,10 @@ func TestDropVirtualChannelFunc(t *testing.T) {
})
})
})
t.Run("datacoord drop fails", func(t *testing.T) {
dataCoord.DropVirtualChannelStatus = commonpb.ErrorCode_UnexpectedError
assert.Panics(t, func() {
dropFunc(nil)
})
})
t.Run("datacoord call error", func(t *testing.T) {
dataCoord.DropVirtualChannelStatus = commonpb.ErrorCode_UnexpectedError
dataCoord.DropVirtualChannelError = true
t.Run("datacoord_return_error", func(t *testing.T) {
broker.ExpectedCalls = nil
broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).
Return(errors.New("mock"))
assert.Panics(t, func() {
dropFunc(nil)
})
@ -804,9 +819,10 @@ func TestDropVirtualChannelFunc(t *testing.T) {
// issue https://github.com/milvus-io/milvus/issues/17097
// meta error, datanode shall not panic, just drop the virtual channel
t.Run("datacoord found meta error", func(t *testing.T) {
dataCoord.DropVirtualChannelStatus = commonpb.ErrorCode_MetaFailed
dataCoord.DropVirtualChannelError = false
t.Run("datacoord_return_channel_not_found", func(t *testing.T) {
broker.ExpectedCalls = nil
broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).
Return(merr.WrapErrChannelNotFound("channel"))
assert.NotPanics(t, func() {
dropFunc(nil)
})

View File

@ -22,30 +22,25 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
// metaService initialize channel collection in data node from root coord.
// Initializing channel collection happens on data node starting. It depends on
// a healthy root coord and a valid root coord grpc client.
type metaService struct {
channel Channel
collectionID UniqueID
rootCoord types.RootCoordClient
broker broker.Broker
}
// newMetaService creates a new metaService with provided RootCoord and collectionID.
func newMetaService(rc types.RootCoordClient, collectionID UniqueID) *metaService {
func newMetaService(broker broker.Broker, collectionID UniqueID) *metaService {
return &metaService{
rootCoord: rc,
broker: broker,
collectionID: collectionID,
}
}
@ -53,34 +48,17 @@ func newMetaService(rc types.RootCoordClient, collectionID UniqueID) *metaServic
// getCollectionSchema get collection schema with provided collection id at specified timestamp.
func (mService *metaService) getCollectionSchema(ctx context.Context, collID UniqueID, timestamp Timestamp) (*schemapb.CollectionSchema, error) {
response, err := mService.getCollectionInfo(ctx, collID, timestamp)
if response != nil {
return response.GetSchema(), err
if err != nil {
return nil, err
}
return nil, err
return response.GetSchema(), nil
}
// getCollectionInfo get collection info with provided collection id at specified timestamp.
func (mService *metaService) getCollectionInfo(ctx context.Context, collID UniqueID, timestamp Timestamp) (*milvuspb.DescribeCollectionResponse, error) {
req := &milvuspb.DescribeCollectionRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection),
commonpbutil.WithMsgID(0), // GOOSE TODO
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
// please do not specify the collection name alone after database feature.
CollectionID: collID,
TimeStamp: timestamp,
}
response, err := mService.rootCoord.DescribeCollectionInternal(ctx, req)
response, err := mService.broker.DescribeCollection(ctx, collID, timestamp)
if err != nil {
log.Error("grpc error when describe", zap.Int64("collectionID", collID), zap.Error(err))
return nil, err
}
if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
err := merr.Error(response.Status)
log.Error("describe collection from rootcoord failed", zap.Int64("collectionID", collID), zap.Error(err))
log.Error("failed to describe collection from rootcoord", zap.Int64("collectionID", collID), zap.Error(err))
return nil, err
}

View File

@ -22,11 +22,14 @@ import (
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"google.golang.org/grpc"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/pkg/util/merr"
)
const (
@ -40,12 +43,15 @@ func TestMetaService_All(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mFactory := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
}
mFactory.setCollectionID(collectionID0)
mFactory.setCollectionName(collectionName0)
ms := newMetaService(mFactory, collectionID0)
meta := NewMetaFactory().GetCollectionMeta(collectionID0, collectionName0, schemapb.DataType_Int64)
broker := broker.NewMockBroker(t)
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(nil),
Schema: meta.GetSchema(),
}, nil).Maybe()
ms := newMetaService(broker, collectionID0)
t.Run("Test getCollectionSchema", func(t *testing.T) {
sch, err := ms.getCollectionSchema(ctx, collectionID0, 0)
@ -89,17 +95,11 @@ func TestMetaServiceRootCoodFails(t *testing.T) {
rc.setCollectionID(collectionID0)
rc.setCollectionName(collectionName0)
ms := newMetaService(rc, collectionID0)
_, err := ms.getCollectionSchema(context.Background(), collectionID1, 0)
assert.Error(t, err)
})
broker := broker.NewMockBroker(t)
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(nil, errors.New("mock"))
t.Run("Test Describe wit nil response", func(t *testing.T) {
rc := &RootCoordFails2{}
rc.setCollectionID(collectionID0)
rc.setCollectionName(collectionName0)
ms := newMetaService(rc, collectionID0)
ms := newMetaService(broker, collectionID0)
_, err := ms.getCollectionSchema(context.Background(), collectionID1, 0)
assert.Error(t, err)
})

View File

@ -25,6 +25,7 @@ import (
"time"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/mock"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -32,6 +33,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -82,17 +84,12 @@ func newIDLEDataNodeMock(ctx context.Context, pkType schemapb.DataType) *DataNod
node.SetSession(&sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}})
node.dispClient = msgdispatcher.NewClient(factory, typeutil.DataNodeRole, paramtable.GetNodeID())
rc := &RootCoordFactory{
ID: 0,
collectionID: 1,
collectionName: "collection-1",
pkType: pkType,
}
node.rootCoord = rc
broker := &broker.MockBroker{}
broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything).Return([]*datapb.SegmentInfo{}, nil).Maybe()
ds := &DataCoordFactory{}
node.dataCoord = ds
node.timeTickSender = newTimeTickSender(node.dataCoord, 0)
node.broker = broker
node.timeTickSender = newTimeTickSender(node.broker, 0)
return node
}

View File

@ -464,24 +464,13 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest)
// get a timestamp for all the rows
// Ignore cancellation from parent context.
rep, err := node.rootCoord.AllocTimestamp(newCtx, &rootcoordpb.AllocTimestampRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_RequestTSO),
commonpbutil.WithMsgID(0),
commonpbutil.WithTimeStamp(0),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
Count: 1,
})
if rep.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success || err != nil {
ts, _, err := node.broker.AllocTimestamp(newCtx, 1)
if err != nil {
return returnFailFunc("DataNode alloc ts failed", err)
}
ts := rep.GetTimestamp()
// get collection schema and shard number
metaService := newMetaService(node.rootCoord, req.GetImportTask().GetCollectionId())
metaService := newMetaService(node.broker, req.GetImportTask().GetCollectionId())
colInfo, err := metaService.getCollectionInfo(newCtx, req.GetImportTask().GetCollectionId(), 0)
if err != nil {
return returnFailFunc("failed to get collection info for collection ID", err)
@ -495,7 +484,7 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest)
}
// TODO: prefer to set partitionIDs in coord instead of get here.
// the colInfo doesn't have a correct database name(it is empty). use the database name passed from rootcoord.
partitions, err := node.getPartitions(ctx, req.GetImportTask().GetDatabaseName(), colInfo.GetCollectionName())
partitions, err := node.broker.ShowPartitions(ctx, req.GetImportTask().GetDatabaseName(), colInfo.GetCollectionName())
if err != nil {
return returnFailFunc("failed to get partition id list", err)
}
@ -560,46 +549,6 @@ func (node *DataNode) FlushChannels(ctx context.Context, req *datapb.FlushChanne
return merr.Success(), nil
}
func (node *DataNode) getPartitions(ctx context.Context, dbName string, collectionName string) (map[string]int64, error) {
req := &milvuspb.ShowPartitionsRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_ShowPartitions),
),
DbName: dbName,
CollectionName: collectionName,
}
logFields := []zap.Field{
zap.String("dbName", dbName),
zap.String("collectionName", collectionName),
}
resp, err := node.rootCoord.ShowPartitions(ctx, req)
if err == nil {
err = merr.Error(resp.GetStatus())
}
if err != nil {
logFields = append(logFields, zap.Error(err))
log.Warn("failed to get partitions of collection", logFields...)
return nil, err
}
partitionNames := resp.GetPartitionNames()
partitionIDs := resp.GetPartitionIDs()
if len(partitionNames) != len(partitionIDs) {
logFields = append(logFields, zap.Int("number of names", len(partitionNames)), zap.Int("number of ids", len(partitionIDs)))
log.Warn("partition names and ids are unequal", logFields...)
return nil, fmt.Errorf("partition names and ids are unequal, number of names: %d, number of ids: %d",
len(partitionNames), len(partitionIDs))
}
partitions := make(map[string]int64)
for i := 0; i < len(partitionNames); i++ {
partitions[partitionNames[i]] = partitionIDs[i]
}
return partitions, nil
}
// AddImportSegment adds the import segment to the current DataNode.
func (node *DataNode) AddImportSegment(ctx context.Context, req *datapb.AddImportSegmentRequest) (*datapb.AddImportSegmentResponse, error) {
logFields := []zap.Field{
@ -741,18 +690,16 @@ func assignSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest) importutil
logFields = append(logFields, zap.Int64("collection ID", colID))
logFields = append(logFields, zap.String("target channel name", targetChName))
log.Info("assign segment for the import task", logFields...)
resp, err := node.dataCoord.AssignSegmentID(context.Background(), segmentIDReq)
ids, err := node.broker.AssignSegmentID(context.Background(), segmentIDReq.GetSegmentIDRequests()...)
if err != nil {
return 0, "", fmt.Errorf("syncSegmentID Failed:%w", err)
}
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return 0, "", fmt.Errorf("syncSegmentID Failed:%s", resp.GetStatus().GetReason())
}
if len(resp.SegIDAssignments) == 0 || resp.SegIDAssignments[0] == nil {
return 0, "", fmt.Errorf("syncSegmentID Failed: the collection was dropped")
return 0, "", errors.Wrap(err, "failed to AssignSegmentID")
}
segmentID := resp.SegIDAssignments[0].SegID
if len(ids) == 0 {
return 0, "", merr.WrapErrSegmentNotFound(0, "failed to assign segment id")
}
segmentID := ids[0]
logFields = append(logFields, zap.Int64("segmentID", segmentID))
log.Info("new segment assigned", logFields...)
@ -831,7 +778,7 @@ func saveSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest, res *rootcoo
err := retry.Do(context.Background(), func() error {
// Ask DataCoord to save binlog path and add segment to the corresponding DataNode flow graph.
resp, err := node.dataCoord.SaveImportSegment(context.Background(), &datapb.SaveImportSegmentRequest{
err := node.broker.SaveImportSegment(context.Background(), &datapb.SaveImportSegmentRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithTimeStamp(ts), // Pass current timestamp downstream.
commonpbutil.WithSourceID(paramtable.GetNodeID()),
@ -867,12 +814,10 @@ func saveSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest, res *rootcoo
})
// Only retrying when DataCoord is unhealthy or err != nil, otherwise return immediately.
if err != nil {
return fmt.Errorf(err.Error())
}
if resp.ErrorCode != commonpb.ErrorCode_Success && resp.ErrorCode != commonpb.ErrorCode_NotReadyServe {
return retry.Unrecoverable(fmt.Errorf("failed to save import segment, reason = %s", resp.Reason))
} else if resp.ErrorCode == commonpb.ErrorCode_NotReadyServe {
return fmt.Errorf("failed to save import segment: %s", resp.GetReason())
if errors.Is(err, merr.ErrServiceNotReady) {
return retry.Unrecoverable(err)
}
return err
}
return nil
})
@ -924,13 +869,15 @@ func createBinLogs(rowNum int, schema *schemapb.CollectionSchema, ts Timestamp,
Data: tsFieldData,
}
if status, _ := node.dataCoord.UpdateSegmentStatistics(context.TODO(), &datapb.UpdateSegmentStatisticsRequest{
Stats: []*commonpb.SegmentStats{{
SegmentID: segmentID,
NumRows: int64(rowNum),
}},
}); status.GetErrorCode() != commonpb.ErrorCode_Success {
return nil, nil, fmt.Errorf(status.GetReason())
if err := node.broker.UpdateSegmentStatistics(context.TODO(), &datapb.UpdateSegmentStatisticsRequest{
Stats: []*commonpb.SegmentStats{
{
SegmentID: segmentID,
NumRows: int64(rowNum),
},
},
}); err != nil {
return nil, nil, err
}
data := BufferData{buffer: &InsertData{
@ -1027,15 +974,11 @@ func createBinLogs(rowNum int, schema *schemapb.CollectionSchema, ts Timestamp,
func reportImportFunc(node *DataNode) importutil.ReportFunc {
return func(importResult *rootcoordpb.ImportResult) error {
err := retry.Do(context.Background(), func() error {
status, err := node.rootCoord.ReportImport(context.Background(), importResult)
err := node.broker.ReportImport(context.Background(), importResult)
if err != nil {
log.Error("fail to report import state to RootCoord", zap.Error(err))
return err
log.Error("failed to report import state to RootCoord", zap.Error(err))
}
if status.GetErrorCode() != commonpb.ErrorCode_Success {
return merr.Error(status)
}
return nil
return err
}, retry.Attempts(node.reportImportRetryTimes))
return err

View File

@ -23,7 +23,9 @@ import (
"path/filepath"
"sync"
"testing"
"time"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
clientv3 "go.etcd.io/etcd/client/v3"
@ -35,6 +37,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
allocator2 "github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage"
@ -48,11 +51,13 @@ import (
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
type DataNodeServicesSuite struct {
suite.Suite
broker *broker.MockBroker
node *DataNode
etcdCli *clientv3.Client
ctx context.Context
@ -96,6 +101,25 @@ func (s *DataNodeServicesSuite) SetupTest() {
}, nil).Maybe()
s.node.allocator = alloc
meta := NewMetaFactory().GetCollectionMeta(1, "collection", schemapb.DataType_Int64)
broker := broker.NewMockBroker(s.T())
broker.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything).
Return([]*datapb.SegmentInfo{}, nil).Maybe()
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(nil),
Schema: meta.GetSchema(),
ShardsNum: common.DefaultShardsNum,
}, nil).Maybe()
broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).Call.Return(tsoutil.ComposeTSByTime(time.Now(), 0),
func(_ context.Context, num uint32) uint32 { return num }, nil).Maybe()
s.broker = broker
s.node.broker = broker
err = s.node.Start()
s.Require().NoError(err)
@ -104,8 +128,15 @@ func (s *DataNodeServicesSuite) SetupTest() {
}
func (s *DataNodeServicesSuite) TearDownTest() {
s.node.Stop()
s.node = nil
if s.broker != nil {
s.broker.AssertExpectations(s.T())
s.broker = nil
}
if s.node != nil {
s.node.Stop()
s.node = nil
}
}
func (s *DataNodeServicesSuite) TearDownSuite() {
@ -380,9 +411,8 @@ func (s *DataNodeServicesSuite) TestImport() {
collectionID: 100,
pkType: schemapb.DataType_Int64,
}
s.node.reportImportRetryTimes = 1 // save test time cost from 440s to 180s
s.Run("test normal", func() {
content := []byte(`{
content := []byte(`{
"rows":[
{"bool_field": true, "int8_field": 10, "int16_field": 101, "int32_field": 1001, "int64_field": 10001, "float32_field": 3.14, "float64_field": 1.56, "varChar_field": "hello world", "binary_vector_field": [254, 0, 254, 0], "float_vector_field": [1.1, 1.2]},
{"bool_field": false, "int8_field": 11, "int16_field": 102, "int32_field": 1002, "int64_field": 10002, "float32_field": 3.15, "float64_field": 2.56, "varChar_field": "hello world", "binary_vector_field": [253, 0, 253, 0], "float_vector_field": [2.1, 2.2]},
@ -391,7 +421,15 @@ func (s *DataNodeServicesSuite) TestImport() {
{"bool_field": true, "int8_field": 14, "int16_field": 105, "int32_field": 1005, "int64_field": 10005, "float32_field": 3.18, "float64_field": 5.56, "varChar_field": "hello world", "binary_vector_field": [250, 0, 250, 0], "float_vector_field": [5.1, 5.2]}
]
}`)
filePath := filepath.Join(s.node.chunkManager.RootPath(), "rows_1.json")
err := s.node.chunkManager.Write(s.ctx, filePath, content)
s.Require().NoError(err)
s.node.reportImportRetryTimes = 1 // save test time cost from 440s to 180s
s.Run("test normal", func() {
defer func() {
s.TearDownTest()
}()
chName1 := "fake-by-dev-rootcoord-dml-testimport-1"
chName2 := "fake-by-dev-rootcoord-dml-testimport-2"
err := s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, &datapb.VchannelInfo{
@ -414,9 +452,6 @@ func (s *DataNodeServicesSuite) TestImport() {
_, ok = s.node.flowgraphManager.getFlowgraphService(chName2)
s.Require().True(ok)
filePath := filepath.Join(s.node.chunkManager.RootPath(), "rows_1.json")
err = s.node.chunkManager.Write(s.ctx, filePath, content)
s.Require().Nil(err)
req := &datapb.ImportTaskRequest{
ImportTask: &datapb.ImportTask{
CollectionId: 100,
@ -426,30 +461,14 @@ func (s *DataNodeServicesSuite) TestImport() {
RowBased: true,
},
}
s.node.rootCoord.(*RootCoordFactory).ReportImportErr = true
_, err = s.node.Import(s.ctx, req)
s.Assert().NoError(err)
s.node.rootCoord.(*RootCoordFactory).ReportImportErr = false
s.node.rootCoord.(*RootCoordFactory).ReportImportNotSuccess = true
_, err = s.node.Import(context.WithValue(s.ctx, ctxKey{}, ""), req)
s.Assert().NoError(err)
s.node.rootCoord.(*RootCoordFactory).ReportImportNotSuccess = false
s.broker.EXPECT().ReportImport(mock.Anything, mock.Anything).Return(nil)
s.broker.EXPECT().UpdateSegmentStatistics(mock.Anything, mock.Anything).Return(nil)
s.broker.EXPECT().AssignSegmentID(mock.Anything, mock.Anything).
Return([]int64{10001}, nil)
s.broker.EXPECT().SaveImportSegment(mock.Anything, mock.Anything).Return(nil)
s.node.dataCoord.(*DataCoordFactory).AddSegmentError = true
_, err = s.node.Import(context.WithValue(s.ctx, ctxKey{}, ""), req)
s.Assert().NoError(err)
s.node.dataCoord.(*DataCoordFactory).AddSegmentError = false
s.node.dataCoord.(*DataCoordFactory).AddSegmentNotSuccess = true
_, err = s.node.Import(context.WithValue(s.ctx, ctxKey{}, ""), req)
s.Assert().NoError(err)
s.node.dataCoord.(*DataCoordFactory).AddSegmentNotSuccess = false
s.node.dataCoord.(*DataCoordFactory).AddSegmentEmpty = true
_, err = s.node.Import(context.WithValue(s.ctx, ctxKey{}, ""), req)
s.Assert().NoError(err)
s.node.dataCoord.(*DataCoordFactory).AddSegmentEmpty = false
s.node.Import(s.ctx, req)
stat, err := s.node.Import(context.WithValue(s.ctx, ctxKey{}, ""), req)
s.Assert().NoError(err)
@ -470,6 +489,10 @@ func (s *DataNodeServicesSuite) TestImport() {
})
s.Run("Test Import bad flow graph", func() {
s.SetupTest()
defer func() {
s.TearDownTest()
}()
chName1 := "fake-by-dev-rootcoord-dml-testimport-1-badflowgraph"
chName2 := "fake-by-dev-rootcoord-dml-testimport-2-badflowgraph"
err := s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, &datapb.VchannelInfo{
@ -492,19 +515,12 @@ func (s *DataNodeServicesSuite) TestImport() {
_, ok = s.node.flowgraphManager.getFlowgraphService(chName2)
s.Require().True(ok)
content := []byte(`{
"rows":[
{"bool_field": true, "int8_field": 10, "int16_field": 101, "int32_field": 1001, "int64_field": 10001, "float32_field": 3.14, "float64_field": 1.56, "varChar_field": "hello world", "binary_vector_field": [254, 0, 254, 0], "float_vector_field": [1.1, 1.2]},
{"bool_field": false, "int8_field": 11, "int16_field": 102, "int32_field": 1002, "int64_field": 10002, "float32_field": 3.15, "float64_field": 2.56, "varChar_field": "hello world", "binary_vector_field": [253, 0, 253, 0], "float_vector_field": [2.1, 2.2]},
{"bool_field": true, "int8_field": 12, "int16_field": 103, "int32_field": 1003, "int64_field": 10003, "float32_field": 3.16, "float64_field": 3.56, "varChar_field": "hello world", "binary_vector_field": [252, 0, 252, 0], "float_vector_field": [3.1, 3.2]},
{"bool_field": false, "int8_field": 13, "int16_field": 104, "int32_field": 1004, "int64_field": 10004, "float32_field": 3.17, "float64_field": 4.56, "varChar_field": "hello world", "binary_vector_field": [251, 0, 251, 0], "float_vector_field": [4.1, 4.2]},
{"bool_field": true, "int8_field": 14, "int16_field": 105, "int32_field": 1005, "int64_field": 10005, "float32_field": 3.18, "float64_field": 5.56, "varChar_field": "hello world", "binary_vector_field": [250, 0, 250, 0], "float_vector_field": [5.1, 5.2]}
]
}`)
s.broker.EXPECT().UpdateSegmentStatistics(mock.Anything, mock.Anything).Return(nil)
s.broker.EXPECT().ReportImport(mock.Anything, mock.Anything).Return(nil)
s.broker.EXPECT().AssignSegmentID(mock.Anything, mock.Anything).
Return([]int64{10001}, nil)
s.broker.EXPECT().SaveImportSegment(mock.Anything, mock.Anything).Return(nil)
filePath := filepath.Join(s.node.chunkManager.RootPath(), "rows_1.json")
err = s.node.chunkManager.Write(s.ctx, filePath, content)
s.Assert().NoError(err)
req := &datapb.ImportTaskRequest{
ImportTask: &datapb.ImportTask{
CollectionId: 100,
@ -519,25 +535,19 @@ func (s *DataNodeServicesSuite) TestImport() {
s.Assert().True(merr.Ok(stat))
s.Assert().Equal("", stat.GetReason())
})
s.Run("Test Import report import error", func() {
s.node.rootCoord = &RootCoordFactory{
collectionID: 100,
pkType: schemapb.DataType_Int64,
ReportImportErr: true,
}
content := []byte(`{
"rows":[
{"bool_field": true, "int8_field": 10, "int16_field": 101, "int32_field": 1001, "int64_field": 10001, "float32_field": 3.14, "float64_field": 1.56, "varChar_field": "hello world", "binary_vector_field": [254, 0, 254, 0], "float_vector_field": [1.1, 1.2]},
{"bool_field": false, "int8_field": 11, "int16_field": 102, "int32_field": 1002, "int64_field": 10002, "float32_field": 3.15, "float64_field": 2.56, "varChar_field": "hello world", "binary_vector_field": [253, 0, 253, 0], "float_vector_field": [2.1, 2.2]},
{"bool_field": true, "int8_field": 12, "int16_field": 103, "int32_field": 1003, "int64_field": 10003, "float32_field": 3.16, "float64_field": 3.56, "varChar_field": "hello world", "binary_vector_field": [252, 0, 252, 0], "float_vector_field": [3.1, 3.2]},
{"bool_field": false, "int8_field": 13, "int16_field": 104, "int32_field": 1004, "int64_field": 10004, "float32_field": 3.17, "float64_field": 4.56, "varChar_field": "hello world", "binary_vector_field": [251, 0, 251, 0], "float_vector_field": [4.1, 4.2]},
{"bool_field": true, "int8_field": 14, "int16_field": 105, "int32_field": 1005, "int64_field": 10005, "float32_field": 3.18, "float64_field": 5.56, "varChar_field": "hello world", "binary_vector_field": [250, 0, 250, 0], "float_vector_field": [5.1, 5.2]}
]
}`)
s.Run("test_Import_report_import_error", func() {
s.SetupTest()
s.node.reportImportRetryTimes = 1
defer func() {
s.TearDownTest()
}()
s.broker.EXPECT().AssignSegmentID(mock.Anything, mock.Anything).
Return([]int64{10001}, nil)
s.broker.EXPECT().ReportImport(mock.Anything, mock.Anything).Return(errors.New("mocked"))
s.broker.EXPECT().UpdateSegmentStatistics(mock.Anything, mock.Anything).Return(nil)
s.broker.EXPECT().SaveImportSegment(mock.Anything, mock.Anything).Return(nil)
filePath := filepath.Join(s.node.chunkManager.RootPath(), "rows_1.json")
err := s.node.chunkManager.Write(s.ctx, filePath, content)
s.Assert().NoError(err)
req := &datapb.ImportTaskRequest{
ImportTask: &datapb.ImportTask{
CollectionId: 100,
@ -552,8 +562,25 @@ func (s *DataNodeServicesSuite) TestImport() {
s.Assert().False(merr.Ok(stat))
})
s.Run("Test Import error", func() {
s.node.rootCoord = &RootCoordFactory{collectionID: -1}
s.Run("test_import_error", func() {
s.SetupTest()
defer func() {
s.TearDownTest()
}()
s.broker.ExpectedCalls = nil
s.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(merr.WrapErrCollectionNotFound("collection")),
}, nil)
s.broker.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything).
Return([]*datapb.SegmentInfo{}, nil).Maybe()
s.broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe()
s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe()
s.broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
s.broker.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).Call.Return(tsoutil.ComposeTSByTime(time.Now(), 0),
func(_ context.Context, num uint32) uint32 { return num }, nil).Maybe()
s.broker.EXPECT().ReportImport(mock.Anything, mock.Anything).Return(nil)
req := &datapb.ImportTaskRequest{
ImportTask: &datapb.ImportTask{
CollectionId: 100,
@ -573,42 +600,6 @@ func (s *DataNodeServicesSuite) TestImport() {
s.Assert().NoError(err)
s.Assert().False(merr.Ok(stat))
})
s.Run("test get partitions", func() {
s.node.rootCoord = &RootCoordFactory{
ShowPartitionsErr: true,
}
_, err := s.node.getPartitions(context.Background(), "", "")
s.Assert().Error(err)
s.node.rootCoord = &RootCoordFactory{
ShowPartitionsNotSuccess: true,
}
_, err = s.node.getPartitions(context.Background(), "", "")
s.Assert().Error(err)
s.node.rootCoord = &RootCoordFactory{
ShowPartitionsNames: []string{"a", "b"},
ShowPartitionsIDs: []int64{1},
}
_, err = s.node.getPartitions(context.Background(), "", "")
s.Assert().Error(err)
s.node.rootCoord = &RootCoordFactory{
ShowPartitionsNames: []string{"a", "b"},
ShowPartitionsIDs: []int64{1, 2},
}
partitions, err := s.node.getPartitions(context.Background(), "", "")
s.Assert().NoError(err)
s.Assert().Contains(partitions, "a")
s.Assert().Equal(int64(1), partitions["a"])
s.Assert().Contains(partitions, "b")
s.Assert().Equal(int64(2), partitions["b"])
})
}
func (s *DataNodeServicesSuite) TestAddImportSegment() {

View File

@ -25,21 +25,18 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
// timeTickSender is to merge channel states updated by flow graph node and send to datacoord periodically
// timeTickSender hold a SegmentStats time sequence cache for each channel,
// after send succeeds will clean the cache earlier than the sended timestamp
type timeTickSender struct {
nodeID int64
dataCoord types.DataCoordClient
nodeID int64
broker broker.Broker
mu sync.Mutex
channelStatesCaches map[string]*segmentStatesSequence // string -> *segmentStatesSequence
@ -50,10 +47,10 @@ type segmentStatesSequence struct {
data map[uint64][]*commonpb.SegmentStats // ts -> segmentStats
}
func newTimeTickSender(dataCoord types.DataCoordClient, nodeID int64) *timeTickSender {
func newTimeTickSender(broker broker.Broker, nodeID int64) *timeTickSender {
return &timeTickSender{
nodeID: nodeID,
dataCoord: dataCoord,
broker: broker,
channelStatesCaches: make(map[string]*segmentStatesSequence, 0),
}
}
@ -159,23 +156,7 @@ func (m *timeTickSender) sendReport(ctx context.Context) error {
toSendMsgs, sendLastTss := m.mergeDatanodeTtMsg()
log.RatedDebug(30, "timeTickSender send datanode timetick message", zap.Any("toSendMsgs", toSendMsgs), zap.Any("sendLastTss", sendLastTss))
err := retry.Do(ctx, func() error {
submitTs := tsoutil.ComposeTSByTime(time.Now(), 0)
status, err := m.dataCoord.ReportDataNodeTtMsgs(ctx, &datapb.ReportDataNodeTtMsgsRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_DataNodeTt),
commonpbutil.WithTimeStamp(submitTs),
commonpbutil.WithSourceID(m.nodeID),
),
Msgs: toSendMsgs,
})
if err == nil {
err = merr.Error(status)
}
if err != nil {
log.Warn("error happen when ReportDataNodeTtMsgs", zap.Error(err))
return err
}
return nil
return m.broker.ReportTimeTick(ctx, toSendMsgs)
}, retry.Attempts(20), retry.Sleep(time.Millisecond*100))
if err != nil {
log.Error("ReportDataNodeTtMsgs fail after retry", zap.Error(err))

View File

@ -21,20 +21,25 @@ import (
"testing"
"time"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"go.uber.org/atomic"
"google.golang.org/grpc"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/merr"
)
func TestTimetickManagerNormal(t *testing.T) {
ctx := context.Background()
manager := newTimeTickSender(&DataCoordFactory{}, 0)
broker := broker.NewMockBroker(t)
broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe()
manager := newTimeTickSender(broker, 0)
channelName1 := "channel1"
ts := uint64(time.Now().UnixMilli())
@ -129,26 +134,11 @@ func TestTimetickManagerNormal(t *testing.T) {
func TestTimetickManagerSendErr(t *testing.T) {
ctx := context.Background()
manager := newTimeTickSender(&DataCoordFactory{ReportDataNodeTtMsgsError: true}, 0)
channelName1 := "channel1"
ts := uint64(time.Now().Unix())
var segmentID1 int64 = 28257
segmentStats := []*commonpb.SegmentStats{
{
SegmentID: segmentID1,
NumRows: 100,
},
}
// update first time
manager.update(channelName1, ts, segmentStats)
err := manager.sendReport(ctx)
assert.Error(t, err)
}
broker := broker.NewMockBroker(t)
broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(errors.New("mock")).Maybe()
func TestTimetickManagerSendNotSuccess(t *testing.T) {
ctx := context.Background()
manager := newTimeTickSender(&DataCoordFactory{ReportDataNodeTtMsgsNotSuccess: true}, 0)
manager := newTimeTickSender(broker, 0)
channelName1 := "channel1"
ts := uint64(time.Now().Unix())
@ -169,18 +159,20 @@ func TestTimetickManagerSendReport(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mockDataCoord := mocks.NewMockDataCoordClient(t)
tsInMill := time.Now().UnixMilli()
validTs := atomic.NewBool(false)
mockDataCoord.EXPECT().ReportDataNodeTtMsgs(mock.Anything, mock.Anything).Run(func(ctx context.Context, req *datapb.ReportDataNodeTtMsgsRequest, opt ...grpc.CallOption) {
if req.GetBase().Timestamp > uint64(tsInMill) {
validTs.Store(true)
}
}).Return(merr.Success(), nil)
manager := newTimeTickSender(mockDataCoord, 0)
called := atomic.NewBool(false)
broker := broker.NewMockBroker(t)
broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).
Run(func(_ context.Context, _ []*msgpb.DataNodeTtMsg) {
called.Store(true)
}).
Return(nil)
mockDataCoord.EXPECT().ReportDataNodeTtMsgs(mock.Anything, mock.Anything).Return(merr.Status(nil), nil).Maybe()
manager := newTimeTickSender(broker, 0)
go manager.start(ctx)
assert.Eventually(t, func() bool {
return validTs.Load()
return called.Load()
}, 2*time.Second, 500*time.Millisecond)
}