fix: Load original key if ts is MaxTimestamp (#36934)

Related to #36933

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/36952/head
congqixia 2024-10-17 14:11:29 +08:00 committed by GitHub
parent 811f6f2402
commit 1184319644
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 35 additions and 10 deletions

View File

@ -64,6 +64,9 @@ etcd:
metastore:
type: etcd # Default value: etcd, Valid values: [etcd, tikv]
snapshot:
ttl: 86400 # snapshot ttl in seconds
reserveTime: 3600 # snapshot reserve time in seconds
# Related configuration of tikv, used to store Milvus metadata.
# Notice that when TiKV is enabled for metastore, you still need to have etcd for service discovery.

View File

@ -35,6 +35,7 @@ import (
"github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
@ -42,10 +43,8 @@ import (
var (
// SuffixSnapshotTombstone special value for tombstone mark
SuffixSnapshotTombstone = []byte{0xE2, 0x9B, 0xBC}
PaginationSize = 5000
DefaultSnapshotReserveTime = 1 * time.Hour
DefaultSnapshotTTL = 24 * time.Hour
SuffixSnapshotTombstone = []byte{0xE2, 0x9B, 0xBC}
PaginationSize = 5000
)
// IsTombstone used in migration tool also.
@ -309,9 +308,9 @@ func (ss *SuffixSnapshot) Save(key string, value string, ts typeutil.Timestamp)
}
func (ss *SuffixSnapshot) Load(key string, ts typeutil.Timestamp) (string, error) {
// if ts == 0, load latest by definition
// if ts == 0 or typeutil.MaxTimestamp, load latest by definition
// and with acceleration logic, just do load key will do
if ts == 0 {
if ts == 0 || ts == typeutil.MaxTimestamp {
value, err := ss.MetaKv.Load(key)
if ss.isTombstone(value) {
return "", errors.New("no value found")
@ -435,7 +434,7 @@ func (ss *SuffixSnapshot) generateSaveExecute(kvs map[string]string, ts typeutil
// LoadWithPrefix load keys with provided prefix and returns value in the ts
func (ss *SuffixSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) {
// ts 0 case shall be treated as fetch latest/current value
if ts == 0 {
if ts == 0 || ts == typeutil.MaxTimestamp {
keys, values, err := ss.MetaKv.LoadWithPrefix(key)
fks := keys[:0] // make([]string, 0, len(keys))
fvs := values[:0] // make([]string, 0, len(values))
@ -653,6 +652,9 @@ func (ss *SuffixSnapshot) batchRemoveExpiredKvs(keyGroup []string, originalKey s
// It walks through all keys with the snapshot prefix, groups them by original key,
// and removes expired versions or all versions if the original key has been deleted
func (ss *SuffixSnapshot) removeExpiredKvs(now time.Time) error {
ttlTime := paramtable.Get().ServiceParam.MetaStoreCfg.SnapshotTTLSeconds.GetAsDuration(time.Second)
reserveTime := paramtable.Get().ServiceParam.MetaStoreCfg.SnapshotReserveTimeSeconds.GetAsDuration(time.Second)
candidateExpiredKeys := make([]string, 0)
latestOriginalKey := ""
latestOriginValue := ""
@ -673,7 +675,7 @@ func (ss *SuffixSnapshot) removeExpiredKvs(now time.Time) error {
for _, key := range candidateExpiredKeys {
ts, _ := ss.isTSKey(key)
expireTime, _ := tsoutil.ParseTS(ts)
if expireTime.Add(DefaultSnapshotTTL).Before(now) {
if expireTime.Add(ttlTime).Before(now) {
expiredKeys = append(expiredKeys, key)
}
}
@ -714,7 +716,7 @@ func (ss *SuffixSnapshot) removeExpiredKvs(now time.Time) error {
// Record versions that are already expired but not removed
time, _ := tsoutil.ParseTS(ts)
if time.Add(DefaultSnapshotReserveTime).Before(now) {
if time.Add(reserveTime).Before(now) {
candidateExpiredKeys = append(candidateExpiredKeys, key)
}

View File

@ -455,7 +455,9 @@ It is recommended to change this parameter before starting Milvus for the first
}
type MetaStoreConfig struct {
MetaStoreType ParamItem `refreshable:"false"`
MetaStoreType ParamItem `refreshable:"false"`
SnapshotTTLSeconds ParamItem `refreshable:"true"`
SnapshotReserveTimeSeconds ParamItem `refreshable:"true"`
}
func (p *MetaStoreConfig) Init(base *BaseTable) {
@ -468,6 +470,24 @@ func (p *MetaStoreConfig) Init(base *BaseTable) {
}
p.MetaStoreType.Init(base.mgr)
p.SnapshotTTLSeconds = ParamItem{
Key: "metastore.snapshot.ttl",
Version: "2.4.14",
DefaultValue: "86400",
Doc: `snapshot ttl in seconds`,
Export: true,
}
p.SnapshotTTLSeconds.Init(base.mgr)
p.SnapshotReserveTimeSeconds = ParamItem{
Key: "metastore.snapshot.reserveTime",
Version: "2.4.14",
DefaultValue: "3600",
Doc: `snapshot reserve time in seconds`,
Export: true,
}
p.SnapshotReserveTimeSeconds.Init(base.mgr)
// TODO: The initialization operation of metadata storage is called in the initialization phase of every node.
// There should be a single initialization operation for meta store, then move the metrics registration to there.
metrics.RegisterMetaType(p.MetaStoreType.GetValue())