Fixbug: segment allocate but return value incorrect (#6788)

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
pull/6795/head
zhenshan.cao 2021-07-24 20:49:20 +08:00 committed by GitHub
parent db94d7771f
commit 2962d31a5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 14 additions and 10 deletions

View File

@ -55,9 +55,7 @@ type assignInfo struct {
collID UniqueID
partitionID UniqueID
channelName string
segID UniqueID
segInfos *list.List
segCapacity uint32
lastInsertTime time.Time
}
@ -74,6 +72,8 @@ func (info *segInfo) Capacity(ts Timestamp) uint32 {
func (info *segInfo) Assign(ts Timestamp, count uint32) uint32 {
if info.IsExpired(ts) {
log.Debug("segInfo Assign IsExpired", zap.Any("ts", ts),
zap.Any("count", count))
return 0
}
ret := uint32(0)
@ -81,8 +81,8 @@ func (info *segInfo) Assign(ts Timestamp, count uint32) uint32 {
info.count -= count
ret = count
} else {
info.count = 0
ret = info.count
info.count = 0
}
return ret
}
@ -313,7 +313,7 @@ func (sa *SegIDAssigner) syncSegments() (bool, error) {
continue
}
assign, err := sa.getAssign(info.CollectionID, info.PartitionID, info.ChannelName)
segInfo := &segInfo{
segInfo2 := &segInfo{
segID: info.SegID,
count: info.Count,
expireTime: info.ExpireTime,
@ -325,7 +325,7 @@ func (sa *SegIDAssigner) syncSegments() (bool, error) {
}
segInfos := list.New()
segInfos.PushBack(segInfo)
segInfos.PushBack(segInfo2)
assign = &assignInfo{
collID: info.CollectionID,
partitionID: info.PartitionID,
@ -335,7 +335,7 @@ func (sa *SegIDAssigner) syncSegments() (bool, error) {
colInfos.PushBack(assign)
sa.assignInfos[info.CollectionID] = colInfos
} else {
assign.segInfos.PushBack(segInfo)
assign.segInfos.PushBack(segInfo2)
}
assign.lastInsertTime = now
success = true
@ -349,9 +349,9 @@ func (sa *SegIDAssigner) processFunc(req allocator.Request) error {
if err != nil {
return err
}
result, err := assign.Assign(segRequest.timestamp, segRequest.count)
result, err2 := assign.Assign(segRequest.timestamp, segRequest.count)
segRequest.segInfo = result
return err
return err2
}
func (sa *SegIDAssigner) GetSegmentID(collID UniqueID, partitionID UniqueID, channelName string, count uint32, ts Timestamp) (map[UniqueID]uint32, error) {

View File

@ -817,7 +817,6 @@ func (it *InsertTask) _assignSegmentID(stream msgstream.MsgStream, pack *msgstre
}
reqSegCountMap := make(map[int32]map[UniqueID]uint32)
for channelID, count := range channelCountMap {
ts, ok := channelMaxTSMap[channelID]
if !ok {
@ -830,6 +829,8 @@ func (it *InsertTask) _assignSegmentID(stream msgstream.MsgStream, pack *msgstre
}
mapInfo, err := it.segIDAssigner.GetSegmentID(it.CollectionID, it.PartitionID, channelName, count, ts)
if err != nil {
log.Debug("InsertTask.go", zap.Any("MapInfo", mapInfo),
zap.Error(err))
return nil, err
}
reqSegCountMap[channelID] = make(map[UniqueID]uint32)
@ -880,7 +881,7 @@ func (it *InsertTask) _assignSegmentID(stream msgstream.MsgStream, pack *msgstre
return segIDSlice[index]
}
}
log.Warn("Can't Found SegmentID")
log.Warn("Can't Found SegmentID", zap.Any("reqSegAllocateCounter", reqSegAllocateCounter))
return 0
}
@ -921,6 +922,9 @@ func (it *InsertTask) _assignSegmentID(stream msgstream.MsgStream, pack *msgstre
rowID := insertRequest.RowIDs[index]
row := insertRequest.RowData[index]
segmentID := getSegmentID(key)
if segmentID == 0 {
return nil, fmt.Errorf("get SegmentID failed, segmentID is zero")
}
_, ok := result[key]
if !ok {
sliceRequest := internalpb.InsertRequest{