mirror of https://github.com/milvus-io/milvus.git
Refactor datacoord allocator (#7226)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/7235/head
parent
db25550fd0
commit
f5451d12f3
|
@ -19,25 +19,30 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
|
||||
)
|
||||
|
||||
// allocator is the interface for allocating `UniqueID` or `Timestamp`
|
||||
type allocator interface {
|
||||
allocTimestamp() (Timestamp, error)
|
||||
allocID() (UniqueID, error)
|
||||
allocTimestamp(context.Context) (Timestamp, error)
|
||||
allocID(context.Context) (UniqueID, error)
|
||||
}
|
||||
|
||||
var _ allocator = (*rootCoordAllocator)(nil)
|
||||
|
||||
// rootCoordAllocator use RootCoord as allocator
|
||||
type rootCoordAllocator struct {
|
||||
ctx context.Context
|
||||
rootCoordClient types.RootCoord
|
||||
types.RootCoord
|
||||
}
|
||||
|
||||
func newRootCoordAllocator(ctx context.Context, rootCoordClient types.RootCoord) *rootCoordAllocator {
|
||||
// newRootCoordAllocator get an allocator from RootCoord
|
||||
func newRootCoordAllocator(rootCoordClient types.RootCoord) allocator {
|
||||
return &rootCoordAllocator{
|
||||
ctx: ctx,
|
||||
rootCoordClient: rootCoordClient,
|
||||
RootCoord: rootCoordClient,
|
||||
}
|
||||
}
|
||||
|
||||
func (alloc *rootCoordAllocator) allocTimestamp() (Timestamp, error) {
|
||||
resp, err := alloc.rootCoordClient.AllocTimestamp(alloc.ctx, &rootcoordpb.AllocTimestampRequest{
|
||||
// allocTimestamp allocate a Timestamp
|
||||
// invoking RootCoord `AllocTimestamp`
|
||||
func (alloc *rootCoordAllocator) allocTimestamp(ctx context.Context) (Timestamp, error) {
|
||||
resp, err := alloc.AllocTimestamp(ctx, &rootcoordpb.AllocTimestampRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_RequestTSO,
|
||||
MsgID: 0,
|
||||
|
@ -52,8 +57,9 @@ func (alloc *rootCoordAllocator) allocTimestamp() (Timestamp, error) {
|
|||
return resp.Timestamp, nil
|
||||
}
|
||||
|
||||
func (alloc *rootCoordAllocator) allocID() (UniqueID, error) {
|
||||
resp, err := alloc.rootCoordClient.AllocID(alloc.ctx, &rootcoordpb.AllocIDRequest{
|
||||
// allocID allocate an `UniqueID` from RootCoord, invoking AllocID grpc
|
||||
func (alloc *rootCoordAllocator) allocID(ctx context.Context) (UniqueID, error) {
|
||||
resp, err := alloc.AllocID(ctx, &rootcoordpb.AllocIDRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_RequestID,
|
||||
MsgID: 0,
|
||||
|
|
|
@ -12,23 +12,35 @@
|
|||
package datacoord
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
func TestAllocator_Basic(t *testing.T) {
|
||||
ms := newMockRootCoordService()
|
||||
allocator := newRootCoordAllocator(context.Background(), ms)
|
||||
allocator := newRootCoordAllocator(ms)
|
||||
ctx := context.Background()
|
||||
|
||||
t.Run("Test allocTimestamp", func(t *testing.T) {
|
||||
_, err := allocator.allocTimestamp()
|
||||
_, err := allocator.allocTimestamp(ctx)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("Test allocID", func(t *testing.T) {
|
||||
_, err := allocator.allocID()
|
||||
_, err := allocator.allocID(ctx)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("Test Unhealthy Root", func(t *testing.T) {
|
||||
ms := newMockRootCoordService()
|
||||
allocator := newRootCoordAllocator(ms)
|
||||
ms.Stop()
|
||||
|
||||
_, err := allocator.allocTimestamp(ctx)
|
||||
assert.Error(t, err)
|
||||
_, err = allocator.allocID(ctx)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -23,7 +23,6 @@ type vchannel struct {
|
|||
// positionProvider provides vchannel pair related position pairs
|
||||
type positionProvider interface {
|
||||
GetVChanPositions(vchans []vchannel, seekFromStartPosition bool) ([]*datapb.VchannelInfo, error)
|
||||
GetDdlChannel() string
|
||||
}
|
||||
|
||||
type dummyPosProvider struct{}
|
||||
|
@ -41,13 +40,3 @@ func (dp dummyPosProvider) GetVChanPositions(vchans []vchannel, seekFromStartPos
|
|||
}
|
||||
return pairs, nil
|
||||
}
|
||||
|
||||
//GetDdlChannel implements positionProvider
|
||||
func (dp dummyPosProvider) GetDdlChannel() string {
|
||||
return "dummy_ddl"
|
||||
}
|
||||
|
||||
//GetDdlChannel implements positionProvider
|
||||
func (s *Server) GetDdlChannel() string {
|
||||
return s.ddChannelName
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
package datacoord
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
|
@ -24,6 +25,7 @@ func TestMeta_Basic(t *testing.T) {
|
|||
const partID0 = UniqueID(100)
|
||||
const partID1 = UniqueID(101)
|
||||
const channelName = "c1"
|
||||
ctx := context.Background()
|
||||
|
||||
mockAllocator := newMockAllocator()
|
||||
meta, err := newMemoryMeta(mockAllocator)
|
||||
|
@ -58,13 +60,13 @@ func TestMeta_Basic(t *testing.T) {
|
|||
t.Run("Test Segment", func(t *testing.T) {
|
||||
meta.AddCollection(collInfoWoPartition)
|
||||
// create seg0 for partition0, seg0/seg1 for partition1
|
||||
segID0_0, err := mockAllocator.allocID()
|
||||
segID0_0, err := mockAllocator.allocID(ctx)
|
||||
assert.Nil(t, err)
|
||||
segInfo0_0 := buildSegment(collID, partID0, segID0_0, channelName)
|
||||
segID1_0, err := mockAllocator.allocID()
|
||||
segID1_0, err := mockAllocator.allocID(ctx)
|
||||
assert.Nil(t, err)
|
||||
segInfo1_0 := buildSegment(collID, partID1, segID1_0, channelName)
|
||||
segID1_1, err := mockAllocator.allocID()
|
||||
segID1_1, err := mockAllocator.allocID(ctx)
|
||||
assert.Nil(t, err)
|
||||
segInfo1_1 := buildSegment(collID, partID1, segID1_1, channelName)
|
||||
|
||||
|
@ -127,7 +129,7 @@ func TestMeta_Basic(t *testing.T) {
|
|||
assert.EqualValues(t, 0, nums)
|
||||
|
||||
// add seg1 with 100 rows
|
||||
segID0, err := mockAllocator.allocID()
|
||||
segID0, err := mockAllocator.allocID(ctx)
|
||||
assert.Nil(t, err)
|
||||
segInfo0 := buildSegment(collID, partID0, segID0, channelName)
|
||||
segInfo0.NumOfRows = rowCount0
|
||||
|
@ -135,7 +137,7 @@ func TestMeta_Basic(t *testing.T) {
|
|||
assert.Nil(t, err)
|
||||
|
||||
// add seg2 with 300 rows
|
||||
segID1, err := mockAllocator.allocID()
|
||||
segID1, err := mockAllocator.allocID(ctx)
|
||||
assert.Nil(t, err)
|
||||
segInfo1 := buildSegment(collID, partID0, segID1, channelName)
|
||||
segInfo1.NumOfRows = rowCount1
|
||||
|
|
|
@ -32,18 +32,20 @@ func newMemoryMeta(allocator allocator) (*meta, error) {
|
|||
return newMeta(memoryKV)
|
||||
}
|
||||
|
||||
var _ allocator = (*MockAllocator)(nil)
|
||||
|
||||
type MockAllocator struct {
|
||||
cnt int64
|
||||
}
|
||||
|
||||
func (m *MockAllocator) allocTimestamp() (Timestamp, error) {
|
||||
func (m *MockAllocator) allocTimestamp(ctx context.Context) (Timestamp, error) {
|
||||
val := atomic.AddInt64(&m.cnt, 1)
|
||||
phy := time.Now().UnixNano() / int64(time.Millisecond)
|
||||
ts := tsoutil.ComposeTS(phy, val)
|
||||
return ts, nil
|
||||
}
|
||||
|
||||
func (m *MockAllocator) allocID() (UniqueID, error) {
|
||||
func (m *MockAllocator) allocID(ctx context.Context) (UniqueID, error) {
|
||||
val := atomic.AddInt64(&m.cnt, 1)
|
||||
return val, nil
|
||||
}
|
||||
|
@ -121,11 +123,12 @@ func (c *mockDataNodeClient) Stop() error {
|
|||
}
|
||||
|
||||
type mockRootCoordService struct {
|
||||
state internalpb.StateCode
|
||||
cnt int64
|
||||
}
|
||||
|
||||
func newMockRootCoordService() *mockRootCoordService {
|
||||
return &mockRootCoordService{}
|
||||
return &mockRootCoordService{state: internalpb.StateCode_Healthy}
|
||||
}
|
||||
|
||||
func (m *mockRootCoordService) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
||||
|
@ -141,6 +144,7 @@ func (m *mockRootCoordService) Start() error {
|
|||
}
|
||||
|
||||
func (m *mockRootCoordService) Stop() error {
|
||||
m.state = internalpb.StateCode_Abnormal
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -153,7 +157,7 @@ func (m *mockRootCoordService) GetComponentStates(ctx context.Context) (*interna
|
|||
State: &internalpb.ComponentInfo{
|
||||
NodeID: 0,
|
||||
Role: "",
|
||||
StateCode: internalpb.StateCode_Healthy,
|
||||
StateCode: m.state,
|
||||
ExtraInfo: []*commonpb.KeyValuePair{},
|
||||
},
|
||||
SubcomponentStates: []*internalpb.ComponentInfo{},
|
||||
|
@ -243,6 +247,10 @@ func (m *mockRootCoordService) DropIndex(ctx context.Context, req *milvuspb.Drop
|
|||
|
||||
//global timestamp allocator
|
||||
func (m *mockRootCoordService) AllocTimestamp(ctx context.Context, req *rootcoordpb.AllocTimestampRequest) (*rootcoordpb.AllocTimestampResponse, error) {
|
||||
if m.state != internalpb.StateCode_Healthy {
|
||||
return &rootcoordpb.AllocTimestampResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}}, nil
|
||||
}
|
||||
|
||||
val := atomic.AddInt64(&m.cnt, int64(req.Count))
|
||||
phy := time.Now().UnixNano() / int64(time.Millisecond)
|
||||
ts := tsoutil.ComposeTS(phy, val)
|
||||
|
@ -257,6 +265,9 @@ func (m *mockRootCoordService) AllocTimestamp(ctx context.Context, req *rootcoor
|
|||
}
|
||||
|
||||
func (m *mockRootCoordService) AllocID(ctx context.Context, req *rootcoordpb.AllocIDRequest) (*rootcoordpb.AllocIDResponse, error) {
|
||||
if m.state != internalpb.StateCode_Healthy {
|
||||
return &rootcoordpb.AllocIDResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}}, nil
|
||||
}
|
||||
val := atomic.AddInt64(&m.cnt, int64(req.Count))
|
||||
return &rootcoordpb.AllocIDResponse{
|
||||
Status: &commonpb.Status{
|
||||
|
|
|
@ -234,7 +234,7 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID
|
|||
requestRows, int64(maxCountPerSegment))
|
||||
|
||||
// create new segments and add allocations
|
||||
expireTs, err := s.genExpireTs()
|
||||
expireTs, err := s.genExpireTs(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -261,8 +261,8 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID
|
|||
return allocations, nil
|
||||
}
|
||||
|
||||
func (s *SegmentManager) genExpireTs() (Timestamp, error) {
|
||||
ts, err := s.allocator.allocTimestamp()
|
||||
func (s *SegmentManager) genExpireTs(ctx context.Context) (Timestamp, error) {
|
||||
ts, err := s.allocator.allocTimestamp(ctx)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
@ -275,7 +275,7 @@ func (s *SegmentManager) genExpireTs() (Timestamp, error) {
|
|||
func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string) (*SegmentInfo, error) {
|
||||
sp, _ := trace.StartSpanFromContext(ctx)
|
||||
defer sp.Finish()
|
||||
id, err := s.allocator.allocID()
|
||||
id, err := s.allocator.allocID(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ func TestAllocSegment(t *testing.T) {
|
|||
segmentManager := newSegmentManager(meta, mockAllocator)
|
||||
|
||||
schema := newTestSchema()
|
||||
collID, err := mockAllocator.allocID()
|
||||
collID, err := mockAllocator.allocID(ctx)
|
||||
assert.Nil(t, err)
|
||||
meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema})
|
||||
|
||||
|
@ -45,13 +45,14 @@ func TestAllocSegment(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestLoadSegmentsFromMeta(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
Params.Init()
|
||||
mockAllocator := newMockAllocator()
|
||||
meta, err := newMemoryMeta(mockAllocator)
|
||||
assert.Nil(t, err)
|
||||
|
||||
schema := newTestSchema()
|
||||
collID, err := mockAllocator.allocID()
|
||||
collID, err := mockAllocator.allocID(ctx)
|
||||
assert.Nil(t, err)
|
||||
meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema})
|
||||
|
||||
|
@ -101,7 +102,7 @@ func TestSaveSegmentsToMeta(t *testing.T) {
|
|||
assert.Nil(t, err)
|
||||
|
||||
schema := newTestSchema()
|
||||
collID, err := mockAllocator.allocID()
|
||||
collID, err := mockAllocator.allocID(context.Background())
|
||||
assert.Nil(t, err)
|
||||
meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema})
|
||||
segmentManager := newSegmentManager(meta, mockAllocator)
|
||||
|
@ -123,7 +124,7 @@ func TestDropSegment(t *testing.T) {
|
|||
assert.Nil(t, err)
|
||||
|
||||
schema := newTestSchema()
|
||||
collID, err := mockAllocator.allocID()
|
||||
collID, err := mockAllocator.allocID(context.Background())
|
||||
assert.Nil(t, err)
|
||||
meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema})
|
||||
segmentManager := newSegmentManager(meta, mockAllocator)
|
||||
|
@ -146,7 +147,7 @@ func TestAllocRowsLargerThanOneSegment(t *testing.T) {
|
|||
assert.Nil(t, err)
|
||||
|
||||
schema := newTestSchema()
|
||||
collID, err := mockAllocator.allocID()
|
||||
collID, err := mockAllocator.allocID(context.Background())
|
||||
assert.Nil(t, err)
|
||||
meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema})
|
||||
|
||||
|
@ -168,7 +169,7 @@ func TestExpireAllocation(t *testing.T) {
|
|||
assert.Nil(t, err)
|
||||
|
||||
schema := newTestSchema()
|
||||
collID, err := mockAllocator.allocID()
|
||||
collID, err := mockAllocator.allocID(context.Background())
|
||||
assert.Nil(t, err)
|
||||
meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema})
|
||||
|
||||
|
@ -210,7 +211,7 @@ func TestGetFlushableSegments(t *testing.T) {
|
|||
assert.Nil(t, err)
|
||||
|
||||
schema := newTestSchema()
|
||||
collID, err := mockAllocator.allocID()
|
||||
collID, err := mockAllocator.allocID(context.Background())
|
||||
assert.Nil(t, err)
|
||||
meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema})
|
||||
segmentManager := newSegmentManager(meta, mockAllocator)
|
||||
|
|
|
@ -197,7 +197,7 @@ func (s *Server) Start() error {
|
|||
return err
|
||||
}
|
||||
|
||||
s.allocator = newRootCoordAllocator(s.ctx, s.rootCoordClient)
|
||||
s.allocator = newRootCoordAllocator(s.rootCoordClient)
|
||||
|
||||
s.startSegmentManager()
|
||||
if err = s.initServiceDiscovery(); err != nil {
|
||||
|
|
Loading…
Reference in New Issue