mirror of https://github.com/milvus-io/milvus.git
Delete end key after rocksdb DeleteRange (#9572)
Signed-off-by: fishpenguin <kun.yu@zilliz.com>pull/9572/merge
parent
a7dc9bdf15
commit
b6b4b784cc
|
@ -466,16 +466,17 @@ func (ri *retentionInfo) newExpiredCleanUp(topic string) error {
|
|||
if pValue != nil {
|
||||
pValue.Free()
|
||||
}
|
||||
pKey := pageIter.Key()
|
||||
pKeyStr := string(pKey.Data())
|
||||
if pKey != nil {
|
||||
pKey.Free()
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
curDeleteSize := deletedAckedSize + size
|
||||
if msgSizeExpiredCheck(curDeleteSize, totalAckedSize) {
|
||||
pKey := pageIter.Key()
|
||||
endID, err = strconv.ParseInt(string(pKey.Data())[FixedChannelNameLen+1:], 10, 64)
|
||||
if pKey != nil {
|
||||
pKey.Free()
|
||||
}
|
||||
endID, err = strconv.ParseInt(pKeyStr[FixedChannelNameLen+1:], 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -495,15 +496,17 @@ func (ri *retentionInfo) newExpiredCleanUp(topic string) error {
|
|||
defer writeBatch.Destroy()
|
||||
|
||||
pageStartIDKey := pageMsgPrefix + "/" + strconv.FormatInt(pageStartID, 10)
|
||||
pageEndIDKey := pageMsgPrefix + "/" + strconv.FormatInt(pageEndID, 10)
|
||||
pageEndIDKey := pageMsgPrefix + "/" + strconv.FormatInt(pageEndID+1, 10)
|
||||
if pageStartID == pageEndID {
|
||||
writeBatch.Delete([]byte(pageStartIDKey))
|
||||
if pageStartID != 0 {
|
||||
writeBatch.Delete([]byte(pageStartIDKey))
|
||||
}
|
||||
} else if pageStartID < pageEndID {
|
||||
writeBatch.DeleteRange([]byte(pageStartIDKey), []byte(pageEndIDKey))
|
||||
}
|
||||
|
||||
ackedStartIDKey := fixedAckedTsKey + "/" + strconv.Itoa(int(startID))
|
||||
ackedEndIDKey := fixedAckedTsKey + "/" + strconv.Itoa(int(endID))
|
||||
ackedEndIDKey := fixedAckedTsKey + "/" + strconv.Itoa(int(endID+1))
|
||||
if startID > endID {
|
||||
return nil
|
||||
} else if startID == endID {
|
||||
|
@ -725,7 +728,7 @@ func DeleteMessages(db *gorocksdb.DB, topic string, startID, endID UniqueID) err
|
|||
log.Debug("RocksMQ: combKey(" + topic + "," + strconv.FormatInt(startID, 10) + ")")
|
||||
return err
|
||||
}
|
||||
endKey, err := combKey(topic, endID)
|
||||
endKey, err := combKey(topic, endID+1)
|
||||
if err != nil {
|
||||
log.Debug("RocksMQ: combKey(" + topic + "," + strconv.FormatInt(endID, 10) + ")")
|
||||
return err
|
||||
|
|
Loading…
Reference in New Issue