mirror of https://github.com/milvus-io/milvus.git
parent
782948a30f
commit
827a18a7c1
|
@ -67,7 +67,7 @@ type Proxy struct {
|
|||
|
||||
idAllocator *allocator.IDAllocator
|
||||
tsoAllocator *TimestampAllocator
|
||||
segAssigner *segIDAssinger
|
||||
segAssigner *segIDAssigner
|
||||
|
||||
metricsCacheManager *metricsinfo.MetricsCacheManager
|
||||
|
||||
|
|
|
@ -132,7 +132,7 @@ func (info *assignInfo) Assign(ts Timestamp, count uint32) (map[UniqueID]uint32,
|
|||
return result, nil
|
||||
}
|
||||
|
||||
type segIDAssinger struct {
|
||||
type segIDAssigner struct {
|
||||
Allocator
|
||||
assignInfos map[UniqueID]*list.List // collectionID -> *list.List
|
||||
segReqs []*datapb.SegmentIDRequest
|
||||
|
@ -143,9 +143,9 @@ type segIDAssinger struct {
|
|||
countPerRPC uint32
|
||||
}
|
||||
|
||||
func newSegIDAssigner(ctx context.Context, dataCoord DataCoord, getTickFunc func() Timestamp) (*segIDAssinger, error) {
|
||||
func newSegIDAssigner(ctx context.Context, dataCoord DataCoord, getTickFunc func() Timestamp) (*segIDAssigner, error) {
|
||||
ctx1, cancel := context.WithCancel(ctx)
|
||||
sa := &segIDAssinger{
|
||||
sa := &segIDAssigner{
|
||||
Allocator: Allocator{
|
||||
Ctx: ctx1,
|
||||
CancelFunc: cancel,
|
||||
|
@ -167,7 +167,7 @@ func newSegIDAssigner(ctx context.Context, dataCoord DataCoord, getTickFunc func
|
|||
return sa, nil
|
||||
}
|
||||
|
||||
func (sa *segIDAssinger) collectExpired() {
|
||||
func (sa *segIDAssigner) collectExpired() {
|
||||
ts := sa.getTickFunc()
|
||||
for _, info := range sa.assignInfos {
|
||||
for e := info.Front(); e != nil; e = e.Next() {
|
||||
|
@ -180,7 +180,7 @@ func (sa *segIDAssinger) collectExpired() {
|
|||
}
|
||||
}
|
||||
|
||||
func (sa *segIDAssinger) pickCanDoFunc() {
|
||||
func (sa *segIDAssigner) pickCanDoFunc() {
|
||||
if sa.ToDoReqs == nil {
|
||||
return
|
||||
}
|
||||
|
@ -228,7 +228,7 @@ func (sa *segIDAssinger) pickCanDoFunc() {
|
|||
sa.ToDoReqs = newTodoReqs
|
||||
}
|
||||
|
||||
func (sa *segIDAssinger) getAssign(collID UniqueID, partitionID UniqueID, channelName string) (*assignInfo, error) {
|
||||
func (sa *segIDAssigner) getAssign(collID UniqueID, partitionID UniqueID, channelName string) (*assignInfo, error) {
|
||||
assignInfos, ok := sa.assignInfos[collID]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("can not find collection %d", collID)
|
||||
|
@ -245,12 +245,12 @@ func (sa *segIDAssinger) getAssign(collID UniqueID, partitionID UniqueID, channe
|
|||
collID, partitionID, channelName)
|
||||
}
|
||||
|
||||
func (sa *segIDAssinger) checkSyncFunc(timeout bool) bool {
|
||||
func (sa *segIDAssigner) checkSyncFunc(timeout bool) bool {
|
||||
sa.collectExpired()
|
||||
return timeout || len(sa.segReqs) != 0
|
||||
}
|
||||
|
||||
func (sa *segIDAssinger) checkSegReqEqual(req1, req2 *datapb.SegmentIDRequest) bool {
|
||||
func (sa *segIDAssigner) checkSegReqEqual(req1, req2 *datapb.SegmentIDRequest) bool {
|
||||
if req1 == nil || req2 == nil {
|
||||
return false
|
||||
}
|
||||
|
@ -261,7 +261,7 @@ func (sa *segIDAssinger) checkSegReqEqual(req1, req2 *datapb.SegmentIDRequest) b
|
|||
return req1.CollectionID == req2.CollectionID && req1.PartitionID == req2.PartitionID && req1.ChannelName == req2.ChannelName
|
||||
}
|
||||
|
||||
func (sa *segIDAssinger) reduceSegReqs() {
|
||||
func (sa *segIDAssigner) reduceSegReqs() {
|
||||
log.Debug("Proxy segIDAssinger reduceSegReqs", zap.Any("len(segReqs)", len(sa.segReqs)))
|
||||
if len(sa.segReqs) == 0 {
|
||||
return
|
||||
|
@ -297,7 +297,7 @@ func (sa *segIDAssinger) reduceSegReqs() {
|
|||
zap.Any("AfterCnt", afterCnt))
|
||||
}
|
||||
|
||||
func (sa *segIDAssinger) syncSegments() (bool, error) {
|
||||
func (sa *segIDAssigner) syncSegments() (bool, error) {
|
||||
if len(sa.segReqs) == 0 {
|
||||
return true, nil
|
||||
}
|
||||
|
@ -364,7 +364,7 @@ func (sa *segIDAssinger) syncSegments() (bool, error) {
|
|||
return success, nil
|
||||
}
|
||||
|
||||
func (sa *segIDAssinger) processFunc(req allocator.Request) error {
|
||||
func (sa *segIDAssigner) processFunc(req allocator.Request) error {
|
||||
segRequest := req.(*segRequest)
|
||||
assign, err := sa.getAssign(segRequest.collID, segRequest.partitionID, segRequest.channelName)
|
||||
if err != nil {
|
||||
|
@ -375,7 +375,7 @@ func (sa *segIDAssinger) processFunc(req allocator.Request) error {
|
|||
return err2
|
||||
}
|
||||
|
||||
func (sa *segIDAssinger) GetSegmentID(collID UniqueID, partitionID UniqueID, channelName string, count uint32, ts Timestamp) (map[UniqueID]uint32, error) {
|
||||
func (sa *segIDAssigner) GetSegmentID(collID UniqueID, partitionID UniqueID, channelName string, count uint32, ts Timestamp) (map[UniqueID]uint32, error) {
|
||||
req := &segRequest{
|
||||
BaseRequest: allocator.BaseRequest{Done: make(chan error), Valid: false},
|
||||
collID: collID,
|
||||
|
|
|
@ -121,7 +121,7 @@ type insertTask struct {
|
|||
|
||||
result *milvuspb.MutationResult
|
||||
rowIDAllocator *allocator.IDAllocator
|
||||
segIDAssigner *segIDAssinger
|
||||
segIDAssigner *segIDAssigner
|
||||
chMgr channelsMgr
|
||||
chTicker channelsTimeTicker
|
||||
vChannels []vChan
|
||||
|
|
Loading…
Reference in New Issue