mirror of https://github.com/milvus-io/milvus.git
Fix golint error of segment.go in proxy package (#8629)
Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>pull/8812/head
parent
577d3c76df
commit
a662c62880
|
@ -67,7 +67,7 @@ type Proxy struct {
|
|||
|
||||
idAllocator *allocator.IDAllocator
|
||||
tsoAllocator *TimestampAllocator
|
||||
segAssigner *SegIDAssigner
|
||||
segAssigner *segIDAssinger
|
||||
|
||||
metricsCacheManager *metricsinfo.MetricsCacheManager
|
||||
|
||||
|
@ -181,7 +181,7 @@ func (node *Proxy) Init() error {
|
|||
}
|
||||
node.tsoAllocator = tsoAllocator
|
||||
|
||||
segAssigner, err := NewSegIDAssigner(node.ctx, node.dataCoord, node.lastTick)
|
||||
segAssigner, err := newSegIDAssigner(node.ctx, node.dataCoord, node.lastTick)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
|
|
@ -28,11 +28,13 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
SegCountPerRPC = 20000
|
||||
segCountPerRPC = 20000
|
||||
)
|
||||
|
||||
// Allocator is an alias for the allocator.Allocator type
|
||||
type Allocator = allocator.Allocator
|
||||
|
||||
// DataCoord is a narrowed interface of DataCoordinator which only provide AssignSegmentID method
|
||||
type DataCoord interface {
|
||||
AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error)
|
||||
}
|
||||
|
@ -130,7 +132,7 @@ func (info *assignInfo) Assign(ts Timestamp, count uint32) (map[UniqueID]uint32,
|
|||
return result, nil
|
||||
}
|
||||
|
||||
type SegIDAssigner struct {
|
||||
type segIDAssinger struct {
|
||||
Allocator
|
||||
assignInfos map[UniqueID]*list.List // collectionID -> *list.List
|
||||
segReqs []*datapb.SegmentIDRequest
|
||||
|
@ -141,15 +143,15 @@ type SegIDAssigner struct {
|
|||
countPerRPC uint32
|
||||
}
|
||||
|
||||
func NewSegIDAssigner(ctx context.Context, dataCoord DataCoord, getTickFunc func() Timestamp) (*SegIDAssigner, error) {
|
||||
func newSegIDAssigner(ctx context.Context, dataCoord DataCoord, getTickFunc func() Timestamp) (*segIDAssinger, error) {
|
||||
ctx1, cancel := context.WithCancel(ctx)
|
||||
sa := &SegIDAssigner{
|
||||
sa := &segIDAssinger{
|
||||
Allocator: Allocator{
|
||||
Ctx: ctx1,
|
||||
CancelFunc: cancel,
|
||||
Role: "SegmentIDAllocator",
|
||||
},
|
||||
countPerRPC: SegCountPerRPC,
|
||||
countPerRPC: segCountPerRPC,
|
||||
dataCoord: dataCoord,
|
||||
assignInfos: make(map[UniqueID]*list.List),
|
||||
getTickFunc: getTickFunc,
|
||||
|
@ -165,7 +167,7 @@ func NewSegIDAssigner(ctx context.Context, dataCoord DataCoord, getTickFunc func
|
|||
return sa, nil
|
||||
}
|
||||
|
||||
func (sa *SegIDAssigner) collectExpired() {
|
||||
func (sa *segIDAssinger) collectExpired() {
|
||||
ts := sa.getTickFunc()
|
||||
for _, info := range sa.assignInfos {
|
||||
for e := info.Front(); e != nil; e = e.Next() {
|
||||
|
@ -178,7 +180,7 @@ func (sa *SegIDAssigner) collectExpired() {
|
|||
}
|
||||
}
|
||||
|
||||
func (sa *SegIDAssigner) pickCanDoFunc() {
|
||||
func (sa *segIDAssinger) pickCanDoFunc() {
|
||||
if sa.ToDoReqs == nil {
|
||||
return
|
||||
}
|
||||
|
@ -204,7 +206,7 @@ func (sa *SegIDAssigner) pickCanDoFunc() {
|
|||
records[collID][partitionID][channelName] += segRequest.count
|
||||
assign, err := sa.getAssign(segRequest.collID, segRequest.partitionID, segRequest.channelName)
|
||||
if err != nil {
|
||||
log.Debug("Proxy SegIDAssigner, pickCanDoFunc getAssign err:", zap.Any("collID", segRequest.collID),
|
||||
log.Debug("Proxy segIDAssinger, pickCanDoFunc getAssign err:", zap.Any("collID", segRequest.collID),
|
||||
zap.Any("partitionID", segRequest.partitionID), zap.Any("channelName", segRequest.channelName),
|
||||
zap.Error(err))
|
||||
}
|
||||
|
@ -220,13 +222,13 @@ func (sa *SegIDAssigner) pickCanDoFunc() {
|
|||
sa.CanDoReqs = append(sa.CanDoReqs, req)
|
||||
}
|
||||
}
|
||||
log.Debug("Proxy SegIDAssigner pickCanDoFunc", zap.Any("records", records),
|
||||
log.Debug("Proxy segIDAssinger pickCanDoFunc", zap.Any("records", records),
|
||||
zap.Any("len(newTodoReqs)", len(newTodoReqs)),
|
||||
zap.Any("len(CanDoReqs)", len(sa.CanDoReqs)))
|
||||
sa.ToDoReqs = newTodoReqs
|
||||
}
|
||||
|
||||
func (sa *SegIDAssigner) getAssign(collID UniqueID, partitionID UniqueID, channelName string) (*assignInfo, error) {
|
||||
func (sa *segIDAssinger) getAssign(collID UniqueID, partitionID UniqueID, channelName string) (*assignInfo, error) {
|
||||
assignInfos, ok := sa.assignInfos[collID]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("can not find collection %d", collID)
|
||||
|
@ -243,12 +245,12 @@ func (sa *SegIDAssigner) getAssign(collID UniqueID, partitionID UniqueID, channe
|
|||
collID, partitionID, channelName)
|
||||
}
|
||||
|
||||
func (sa *SegIDAssigner) checkSyncFunc(timeout bool) bool {
|
||||
func (sa *segIDAssinger) checkSyncFunc(timeout bool) bool {
|
||||
sa.collectExpired()
|
||||
return timeout || len(sa.segReqs) != 0
|
||||
}
|
||||
|
||||
func (sa *SegIDAssigner) checkSegReqEqual(req1, req2 *datapb.SegmentIDRequest) bool {
|
||||
func (sa *segIDAssinger) checkSegReqEqual(req1, req2 *datapb.SegmentIDRequest) bool {
|
||||
if req1 == nil || req2 == nil {
|
||||
return false
|
||||
}
|
||||
|
@ -259,8 +261,8 @@ func (sa *SegIDAssigner) checkSegReqEqual(req1, req2 *datapb.SegmentIDRequest) b
|
|||
return req1.CollectionID == req2.CollectionID && req1.PartitionID == req2.PartitionID && req1.ChannelName == req2.ChannelName
|
||||
}
|
||||
|
||||
func (sa *SegIDAssigner) reduceSegReqs() {
|
||||
log.Debug("Proxy SegIDAssigner reduceSegReqs", zap.Any("len(segReqs)", len(sa.segReqs)))
|
||||
func (sa *segIDAssinger) reduceSegReqs() {
|
||||
log.Debug("Proxy segIDAssinger reduceSegReqs", zap.Any("len(segReqs)", len(sa.segReqs)))
|
||||
if len(sa.segReqs) == 0 {
|
||||
return
|
||||
}
|
||||
|
@ -268,7 +270,7 @@ func (sa *SegIDAssigner) reduceSegReqs() {
|
|||
var newSegReqs []*datapb.SegmentIDRequest
|
||||
for _, req1 := range sa.segReqs {
|
||||
if req1.Count == 0 {
|
||||
log.Debug("Proxy SegIDAssigner reduceSegReqs hit perCount == 0")
|
||||
log.Debug("Proxy segIDAssinger reduceSegReqs hit perCount == 0")
|
||||
req1.Count = sa.countPerRPC
|
||||
}
|
||||
beforeCnt += req1.Count
|
||||
|
@ -290,12 +292,12 @@ func (sa *SegIDAssigner) reduceSegReqs() {
|
|||
afterCnt += req.Count
|
||||
}
|
||||
sa.segReqs = newSegReqs
|
||||
log.Debug("Proxy SegIDAssigner reduceSegReqs after reduce", zap.Any("len(segReqs)", len(sa.segReqs)),
|
||||
log.Debug("Proxy segIDAssinger reduceSegReqs after reduce", zap.Any("len(segReqs)", len(sa.segReqs)),
|
||||
zap.Any("BeforeCnt", beforeCnt),
|
||||
zap.Any("AfterCnt", afterCnt))
|
||||
}
|
||||
|
||||
func (sa *SegIDAssigner) syncSegments() (bool, error) {
|
||||
func (sa *segIDAssinger) syncSegments() (bool, error) {
|
||||
if len(sa.segReqs) == 0 {
|
||||
return true, nil
|
||||
}
|
||||
|
@ -362,7 +364,7 @@ func (sa *SegIDAssigner) syncSegments() (bool, error) {
|
|||
return success, nil
|
||||
}
|
||||
|
||||
func (sa *SegIDAssigner) processFunc(req allocator.Request) error {
|
||||
func (sa *segIDAssinger) processFunc(req allocator.Request) error {
|
||||
segRequest := req.(*segRequest)
|
||||
assign, err := sa.getAssign(segRequest.collID, segRequest.partitionID, segRequest.channelName)
|
||||
if err != nil {
|
||||
|
@ -373,7 +375,7 @@ func (sa *SegIDAssigner) processFunc(req allocator.Request) error {
|
|||
return err2
|
||||
}
|
||||
|
||||
func (sa *SegIDAssigner) GetSegmentID(collID UniqueID, partitionID UniqueID, channelName string, count uint32, ts Timestamp) (map[UniqueID]uint32, error) {
|
||||
func (sa *segIDAssinger) GetSegmentID(collID UniqueID, partitionID UniqueID, channelName string, count uint32, ts Timestamp) (map[UniqueID]uint32, error) {
|
||||
req := &segRequest{
|
||||
BaseRequest: allocator.BaseRequest{Done: make(chan error), Valid: false},
|
||||
collID: collID,
|
||||
|
|
|
@ -86,7 +86,7 @@ func TestSegmentAllocator1(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
dataCoord := &mockDataCoord{}
|
||||
dataCoord.expireTime = Timestamp(1000)
|
||||
segAllocator, err := NewSegIDAssigner(ctx, dataCoord, getLastTick1)
|
||||
segAllocator, err := newSegIDAssigner(ctx, dataCoord, getLastTick1)
|
||||
assert.Nil(t, err)
|
||||
wg := &sync.WaitGroup{}
|
||||
segAllocator.Start()
|
||||
|
@ -107,9 +107,9 @@ func TestSegmentAllocator1(t *testing.T) {
|
|||
}
|
||||
assert.Equal(t, uint32(10), total)
|
||||
|
||||
ret, err := segAllocator.GetSegmentID(1, 1, "abc", SegCountPerRPC-10, 999)
|
||||
ret, err := segAllocator.GetSegmentID(1, 1, "abc", segCountPerRPC-10, 999)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, uint32(SegCountPerRPC-10), ret[1])
|
||||
assert.Equal(t, uint32(segCountPerRPC-10), ret[1])
|
||||
|
||||
_, err = segAllocator.GetSegmentID(1, 1, "abc", 10, 1001)
|
||||
assert.NotNil(t, err)
|
||||
|
@ -131,7 +131,7 @@ func TestSegmentAllocator2(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
dataCoord := &mockDataCoord{}
|
||||
dataCoord.expireTime = Timestamp(500)
|
||||
segAllocator, err := NewSegIDAssigner(ctx, dataCoord, getLastTick2)
|
||||
segAllocator, err := newSegIDAssigner(ctx, dataCoord, getLastTick2)
|
||||
assert.Nil(t, err)
|
||||
wg := &sync.WaitGroup{}
|
||||
segAllocator.Start()
|
||||
|
@ -150,7 +150,7 @@ func TestSegmentAllocator2(t *testing.T) {
|
|||
}
|
||||
assert.Equal(t, uint32(10), total)
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
_, err = segAllocator.GetSegmentID(1, 1, "abc", SegCountPerRPC-10, getLastTick2())
|
||||
_, err = segAllocator.GetSegmentID(1, 1, "abc", segCountPerRPC-10, getLastTick2())
|
||||
assert.NotNil(t, err)
|
||||
wg.Wait()
|
||||
|
||||
|
@ -160,7 +160,7 @@ func TestSegmentAllocator3(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
dataCoord := &mockDataCoord2{}
|
||||
dataCoord.expireTime = Timestamp(500)
|
||||
segAllocator, err := NewSegIDAssigner(ctx, dataCoord, getLastTick2)
|
||||
segAllocator, err := newSegIDAssigner(ctx, dataCoord, getLastTick2)
|
||||
assert.Nil(t, err)
|
||||
wg := &sync.WaitGroup{}
|
||||
segAllocator.Start()
|
||||
|
@ -218,7 +218,7 @@ func TestSegmentAllocator4(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
dataCoord := &mockDataCoord3{}
|
||||
dataCoord.expireTime = Timestamp(500)
|
||||
segAllocator, err := NewSegIDAssigner(ctx, dataCoord, getLastTick2)
|
||||
segAllocator, err := newSegIDAssigner(ctx, dataCoord, getLastTick2)
|
||||
assert.Nil(t, err)
|
||||
wg := &sync.WaitGroup{}
|
||||
segAllocator.Start()
|
||||
|
@ -253,7 +253,7 @@ func TestSegmentAllocator5(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
dataCoord := &mockDataCoord5{}
|
||||
dataCoord.expireTime = Timestamp(500)
|
||||
segAllocator, err := NewSegIDAssigner(ctx, dataCoord, getLastTick2)
|
||||
segAllocator, err := newSegIDAssigner(ctx, dataCoord, getLastTick2)
|
||||
assert.Nil(t, err)
|
||||
wg := &sync.WaitGroup{}
|
||||
segAllocator.Start()
|
||||
|
@ -274,7 +274,7 @@ func TestSegmentAllocator6(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
dataCoord := &mockDataCoord{}
|
||||
dataCoord.expireTime = Timestamp(500)
|
||||
segAllocator, err := NewSegIDAssigner(ctx, dataCoord, getLastTick2)
|
||||
segAllocator, err := newSegIDAssigner(ctx, dataCoord, getLastTick2)
|
||||
assert.Nil(t, err)
|
||||
wg := &sync.WaitGroup{}
|
||||
segAllocator.Start()
|
||||
|
|
|
@ -121,7 +121,7 @@ type insertTask struct {
|
|||
|
||||
result *milvuspb.MutationResult
|
||||
rowIDAllocator *allocator.IDAllocator
|
||||
segIDAssigner *SegIDAssigner
|
||||
segIDAssigner *segIDAssinger
|
||||
chMgr channelsMgr
|
||||
chTicker channelsTimeTicker
|
||||
vChannels []vChan
|
||||
|
|
|
@ -3053,7 +3053,7 @@ func TestInsertTask_all(t *testing.T) {
|
|||
_ = idAllocator.Start()
|
||||
defer idAllocator.Close()
|
||||
|
||||
segAllocator, err := NewSegIDAssigner(ctx, &mockDataCoord{expireTime: Timestamp(2500)}, getLastTick1)
|
||||
segAllocator, err := newSegIDAssigner(ctx, &mockDataCoord{expireTime: Timestamp(2500)}, getLastTick1)
|
||||
assert.NoError(t, err)
|
||||
segAllocator.Init()
|
||||
_ = segAllocator.Start()
|
||||
|
|
Loading…
Reference in New Issue