fix: [10kcp] Fix timeout when listing meta (#38152)

When there are too many key-value pairs, the etcd list operation may
times out. This PR replaces LoadWithPrefix in list operations, which
could involve many keys, with WalkWithPrefix.

issue: https://github.com/milvus-io/milvus/issues/37917

pr: https://github.com/milvus-io/milvus/pull/38151

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/38229/head
yihao.dai 2024-12-03 14:15:49 +08:00 committed by GitHub
parent 0c29d8ff64
commit 3219b869a3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 164 additions and 124 deletions

View File

@ -469,23 +469,23 @@ func (kc *Catalog) DropChannel(ctx context.Context, channel string) error {
}
func (kc *Catalog) ListChannelCheckpoint(ctx context.Context) (map[string]*msgpb.MsgPosition, error) {
keys, values, err := kc.MetaKv.LoadWithPrefix(ChannelCheckpointPrefix)
if err != nil {
return nil, err
}
channelCPs := make(map[string]*msgpb.MsgPosition)
for i, key := range keys {
value := values[i]
applyFn := func(key []byte, value []byte) error {
channelCP := &msgpb.MsgPosition{}
err = proto.Unmarshal([]byte(value), channelCP)
err := proto.Unmarshal(value, channelCP)
if err != nil {
log.Error("unmarshal channelCP failed when ListChannelCheckpoint", zap.Error(err))
return nil, err
return err
}
ss := strings.Split(key, "/")
ss := strings.Split(string(key), "/")
vChannel := ss[len(ss)-1]
channelCPs[vChannel] = channelCP
return nil
}
err := kc.MetaKv.WalkWithPrefix(ChannelCheckpointPrefix, paginationSize, applyFn)
if err != nil {
return nil, err
}
return channelCPs, nil
@ -555,24 +555,23 @@ func (kc *Catalog) CreateIndex(ctx context.Context, index *model.Index) error {
}
func (kc *Catalog) ListIndexes(ctx context.Context) ([]*model.Index, error) {
_, values, err := kc.MetaKv.LoadWithPrefix(util.FieldIndexPrefix)
if err != nil {
log.Error("list index meta fail", zap.String("prefix", util.FieldIndexPrefix), zap.Error(err))
return nil, err
}
indexes := make([]*model.Index, 0)
for _, value := range values {
applyFn := func(key []byte, value []byte) error {
meta := &indexpb.FieldIndex{}
err = proto.Unmarshal([]byte(value), meta)
err := proto.Unmarshal(value, meta)
if err != nil {
log.Warn("unmarshal index info failed", zap.Error(err))
return nil, err
return err
}
indexes = append(indexes, model.UnmarshalIndexModel(meta))
return nil
}
err := kc.MetaKv.WalkWithPrefix(util.FieldIndexPrefix, paginationSize, applyFn)
if err != nil {
return nil, err
}
return indexes, nil
}
@ -632,22 +631,22 @@ func (kc *Catalog) CreateSegmentIndex(ctx context.Context, segIdx *model.Segment
}
func (kc *Catalog) ListSegmentIndexes(ctx context.Context) ([]*model.SegmentIndex, error) {
_, values, err := kc.MetaKv.LoadWithPrefix(util.SegmentIndexPrefix)
if err != nil {
log.Error("list segment index meta fail", zap.String("prefix", util.SegmentIndexPrefix), zap.Error(err))
return nil, err
}
segIndexes := make([]*model.SegmentIndex, 0)
for _, value := range values {
applyFn := func(key []byte, value []byte) error {
segmentIndexInfo := &indexpb.SegmentIndex{}
err = proto.Unmarshal([]byte(value), segmentIndexInfo)
err := proto.Unmarshal(value, segmentIndexInfo)
if err != nil {
log.Warn("unmarshal segment index info failed", zap.Error(err))
return segIndexes, err
return err
}
segIndexes = append(segIndexes, model.UnmarshalSegmentIndexModel(segmentIndexInfo))
return nil
}
err := kc.MetaKv.WalkWithPrefix(util.SegmentIndexPrefix, paginationSize, applyFn)
if err != nil {
return nil, err
}
return segIndexes, nil
@ -689,17 +688,19 @@ func (kc *Catalog) SaveImportJob(job *datapb.ImportJob) error {
func (kc *Catalog) ListImportJobs() ([]*datapb.ImportJob, error) {
jobs := make([]*datapb.ImportJob, 0)
_, values, err := kc.MetaKv.LoadWithPrefix(ImportJobPrefix)
if err != nil {
return nil, err
}
for _, value := range values {
applyFn := func(key []byte, value []byte) error {
job := &datapb.ImportJob{}
err = proto.Unmarshal([]byte(value), job)
err := proto.Unmarshal(value, job)
if err != nil {
return nil, err
return err
}
jobs = append(jobs, job)
return nil
}
err := kc.MetaKv.WalkWithPrefix(ImportJobPrefix, paginationSize, applyFn)
if err != nil {
return nil, err
}
return jobs, nil
}
@ -721,19 +722,20 @@ func (kc *Catalog) SavePreImportTask(task *datapb.PreImportTask) error {
func (kc *Catalog) ListPreImportTasks() ([]*datapb.PreImportTask, error) {
tasks := make([]*datapb.PreImportTask, 0)
_, values, err := kc.MetaKv.LoadWithPrefix(PreImportTaskPrefix)
applyFn := func(key []byte, value []byte) error {
task := &datapb.PreImportTask{}
err := proto.Unmarshal(value, task)
if err != nil {
return err
}
tasks = append(tasks, task)
return nil
}
err := kc.MetaKv.WalkWithPrefix(PreImportTaskPrefix, paginationSize, applyFn)
if err != nil {
return nil, err
}
for _, value := range values {
task := &datapb.PreImportTask{}
err = proto.Unmarshal([]byte(value), task)
if err != nil {
return nil, err
}
tasks = append(tasks, task)
}
return tasks, nil
}
@ -754,17 +756,19 @@ func (kc *Catalog) SaveImportTask(task *datapb.ImportTaskV2) error {
func (kc *Catalog) ListImportTasks() ([]*datapb.ImportTaskV2, error) {
tasks := make([]*datapb.ImportTaskV2, 0)
_, values, err := kc.MetaKv.LoadWithPrefix(ImportTaskPrefix)
if err != nil {
return nil, err
}
for _, value := range values {
applyFn := func(key []byte, value []byte) error {
task := &datapb.ImportTaskV2{}
err = proto.Unmarshal([]byte(value), task)
err := proto.Unmarshal(value, task)
if err != nil {
return nil, err
return err
}
tasks = append(tasks, task)
return nil
}
err := kc.MetaKv.WalkWithPrefix(ImportTaskPrefix, paginationSize, applyFn)
if err != nil {
return nil, err
}
return tasks, nil
}
@ -792,17 +796,19 @@ func (kc *Catalog) GcConfirm(ctx context.Context, collectionID, partitionID type
func (kc *Catalog) ListCompactionTask(ctx context.Context) ([]*datapb.CompactionTask, error) {
tasks := make([]*datapb.CompactionTask, 0)
_, values, err := kc.MetaKv.LoadWithPrefix(CompactionTaskPrefix)
if err != nil {
return nil, err
}
for _, value := range values {
applyFn := func(key []byte, value []byte) error {
info := &datapb.CompactionTask{}
err = proto.Unmarshal([]byte(value), info)
err := proto.Unmarshal(value, info)
if err != nil {
return nil, err
return err
}
tasks = append(tasks, info)
return nil
}
err := kc.MetaKv.WalkWithPrefix(CompactionTaskPrefix, paginationSize, applyFn)
if err != nil {
return nil, err
}
return tasks, nil
}
@ -829,17 +835,19 @@ func (kc *Catalog) DropCompactionTask(ctx context.Context, task *datapb.Compacti
func (kc *Catalog) ListAnalyzeTasks(ctx context.Context) ([]*indexpb.AnalyzeTask, error) {
tasks := make([]*indexpb.AnalyzeTask, 0)
_, values, err := kc.MetaKv.LoadWithPrefix(AnalyzeTaskPrefix)
if err != nil {
return nil, err
}
for _, value := range values {
applyFn := func(key []byte, value []byte) error {
task := &indexpb.AnalyzeTask{}
err = proto.Unmarshal([]byte(value), task)
err := proto.Unmarshal(value, task)
if err != nil {
return nil, err
return err
}
tasks = append(tasks, task)
return nil
}
err := kc.MetaKv.WalkWithPrefix(AnalyzeTaskPrefix, paginationSize, applyFn)
if err != nil {
return nil, err
}
return tasks, nil
}
@ -867,17 +875,19 @@ func (kc *Catalog) DropAnalyzeTask(ctx context.Context, taskID typeutil.UniqueID
func (kc *Catalog) ListPartitionStatsInfos(ctx context.Context) ([]*datapb.PartitionStatsInfo, error) {
infos := make([]*datapb.PartitionStatsInfo, 0)
_, values, err := kc.MetaKv.LoadWithPrefix(PartitionStatsInfoPrefix)
if err != nil {
return nil, err
}
for _, value := range values {
applyFn := func(key []byte, value []byte) error {
info := &datapb.PartitionStatsInfo{}
err = proto.Unmarshal([]byte(value), info)
err := proto.Unmarshal(value, info)
if err != nil {
return nil, err
return err
}
infos = append(infos, info)
return nil
}
err := kc.MetaKv.WalkWithPrefix(PartitionStatsInfoPrefix, paginationSize, applyFn)
if err != nil {
return nil, err
}
return infos, nil
}

View File

@ -638,7 +638,9 @@ func TestChannelCP(t *testing.T) {
err := catalog.SaveChannelCheckpoint(context.TODO(), mockVChannel, pos)
assert.NoError(t, err)
txn.EXPECT().LoadWithPrefix(mock.Anything).Return([]string{k}, []string{string(v)}, nil)
txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error {
return f([]byte(k), v)
})
res, err := catalog.ListChannelCheckpoint(context.TODO())
assert.NoError(t, err)
assert.True(t, len(res) > 0)
@ -647,7 +649,7 @@ func TestChannelCP(t *testing.T) {
t.Run("ListChannelCheckpoint failed", func(t *testing.T) {
txn := mocks.NewMetaKv(t)
catalog := NewCatalog(txn, rootPath, "")
txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, errors.New("mock error"))
txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("mock error"))
_, err = catalog.ListChannelCheckpoint(context.TODO())
assert.Error(t, err)
})
@ -692,7 +694,7 @@ func TestChannelCP(t *testing.T) {
assert.NoError(t, err)
txn.EXPECT().Remove(mock.Anything).Return(nil)
txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, nil)
txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(nil)
err = catalog.DropChannelCheckpoint(context.TODO(), mockVChannel)
assert.NoError(t, err)
res, err := catalog.ListChannelCheckpoint(context.TODO())
@ -879,7 +881,7 @@ func TestCatalog_CreateIndex(t *testing.T) {
func TestCatalog_ListIndexes(t *testing.T) {
t.Run("success", func(t *testing.T) {
metakv := mocks.NewMetaKv(t)
metakv.EXPECT().LoadWithPrefix(mock.Anything).RunAndReturn(func(s string) ([]string, []string, error) {
metakv.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error {
i := &indexpb.FieldIndex{
IndexInfo: &indexpb.IndexInfo{
CollectionID: 0,
@ -894,7 +896,7 @@ func TestCatalog_ListIndexes(t *testing.T) {
}
v, err := proto.Marshal(i)
assert.NoError(t, err)
return []string{"1"}, []string{string(v)}, nil
return f([]byte("1"), v)
})
catalog := &Catalog{
@ -907,7 +909,7 @@ func TestCatalog_ListIndexes(t *testing.T) {
t.Run("failed", func(t *testing.T) {
txn := mocks.NewMetaKv(t)
txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, errors.New("error"))
txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("error"))
catalog := &Catalog{
MetaKv: txn,
}
@ -917,7 +919,9 @@ func TestCatalog_ListIndexes(t *testing.T) {
t.Run("unmarshal failed", func(t *testing.T) {
txn := mocks.NewMetaKv(t)
txn.EXPECT().LoadWithPrefix(mock.Anything).Return([]string{"1"}, []string{"invalid"}, nil)
txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error {
return f([]byte("1"), []byte("invalid"))
})
catalog := &Catalog{
MetaKv: txn,
@ -1071,7 +1075,9 @@ func TestCatalog_ListSegmentIndexes(t *testing.T) {
assert.NoError(t, err)
metakv := mocks.NewMetaKv(t)
metakv.EXPECT().LoadWithPrefix(mock.Anything).Return([]string{"key"}, []string{string(v)}, nil)
metakv.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error {
return f([]byte("key"), v)
})
catalog := &Catalog{
MetaKv: metakv,
}
@ -1083,7 +1089,7 @@ func TestCatalog_ListSegmentIndexes(t *testing.T) {
t.Run("failed", func(t *testing.T) {
metakv := mocks.NewMetaKv(t)
metakv.EXPECT().LoadWithPrefix(mock.Anything).Return([]string{}, []string{}, errors.New("error"))
metakv.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("error"))
catalog := &Catalog{
MetaKv: metakv,
}
@ -1094,7 +1100,9 @@ func TestCatalog_ListSegmentIndexes(t *testing.T) {
t.Run("unmarshal failed", func(t *testing.T) {
metakv := mocks.NewMetaKv(t)
metakv.EXPECT().LoadWithPrefix(mock.Anything).Return([]string{"key"}, []string{"invalid"}, nil)
metakv.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error {
return f([]byte("key"), []byte("invalid"))
})
catalog := &Catalog{
MetaKv: metakv,
}
@ -1377,20 +1385,24 @@ func TestCatalog_Import(t *testing.T) {
txn := mocks.NewMetaKv(t)
value, err := proto.Marshal(job)
assert.NoError(t, err)
txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, []string{string(value)}, nil)
txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error {
return f(nil, value)
})
kc.MetaKv = txn
jobs, err := kc.ListImportJobs()
assert.NoError(t, err)
assert.Equal(t, 1, len(jobs))
txn = mocks.NewMetaKv(t)
txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, []string{"@#%#^#"}, nil)
txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error {
return f(nil, []byte("@#%#^#"))
})
kc.MetaKv = txn
_, err = kc.ListImportJobs()
assert.Error(t, err)
txn = mocks.NewMetaKv(t)
txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, mockErr)
txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(mockErr)
kc.MetaKv = txn
_, err = kc.ListImportJobs()
assert.Error(t, err)
@ -1431,20 +1443,24 @@ func TestCatalog_Import(t *testing.T) {
txn := mocks.NewMetaKv(t)
value, err := proto.Marshal(pit)
assert.NoError(t, err)
txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, []string{string(value)}, nil)
txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error {
return f(nil, value)
})
kc.MetaKv = txn
tasks, err := kc.ListPreImportTasks()
assert.NoError(t, err)
assert.Equal(t, 1, len(tasks))
txn = mocks.NewMetaKv(t)
txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, []string{"@#%#^#"}, nil)
txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error {
return f(nil, []byte("@#%#^#"))
})
kc.MetaKv = txn
_, err = kc.ListPreImportTasks()
assert.Error(t, err)
txn = mocks.NewMetaKv(t)
txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, mockErr)
txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(mockErr)
kc.MetaKv = txn
_, err = kc.ListPreImportTasks()
assert.Error(t, err)
@ -1485,20 +1501,24 @@ func TestCatalog_Import(t *testing.T) {
txn := mocks.NewMetaKv(t)
value, err := proto.Marshal(it)
assert.NoError(t, err)
txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, []string{string(value)}, nil)
txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error {
return f(nil, value)
})
kc.MetaKv = txn
tasks, err := kc.ListImportTasks()
assert.NoError(t, err)
assert.Equal(t, 1, len(tasks))
txn = mocks.NewMetaKv(t)
txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, []string{"@#%#^#"}, nil)
txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(_ string, _ int, f func([]byte, []byte) error) error {
return f(nil, []byte("@#%#^#"))
})
kc.MetaKv = txn
_, err = kc.ListImportTasks()
assert.Error(t, err)
txn = mocks.NewMetaKv(t)
txn.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, mockErr)
txn.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(mockErr)
kc.MetaKv = txn
_, err = kc.ListImportTasks()
assert.Error(t, err)

View File

@ -18,6 +18,8 @@ import (
"github.com/milvus-io/milvus/pkg/util/compressor"
)
var paginationSize = 2000
var ErrInvalidKey = errors.New("invalid load info key")
const (
@ -104,51 +106,57 @@ func (s Catalog) RemoveResourceGroup(rgName string) error {
}
func (s Catalog) GetCollections() ([]*querypb.CollectionLoadInfo, error) {
_, values, err := s.cli.LoadWithPrefix(CollectionLoadInfoPrefix)
if err != nil {
return nil, err
}
ret := make([]*querypb.CollectionLoadInfo, 0, len(values))
for _, v := range values {
ret := make([]*querypb.CollectionLoadInfo, 0)
applyFn := func(key []byte, value []byte) error {
info := querypb.CollectionLoadInfo{}
if err := proto.Unmarshal([]byte(v), &info); err != nil {
return nil, err
if err := proto.Unmarshal(value, &info); err != nil {
return err
}
ret = append(ret, &info)
return nil
}
err := s.cli.WalkWithPrefix(CollectionLoadInfoPrefix, paginationSize, applyFn)
if err != nil {
return nil, err
}
return ret, nil
}
func (s Catalog) GetPartitions() (map[int64][]*querypb.PartitionLoadInfo, error) {
_, values, err := s.cli.LoadWithPrefix(PartitionLoadInfoPrefix)
if err != nil {
return nil, err
}
ret := make(map[int64][]*querypb.PartitionLoadInfo)
for _, v := range values {
applyFn := func(key []byte, value []byte) error {
info := querypb.PartitionLoadInfo{}
if err := proto.Unmarshal([]byte(v), &info); err != nil {
return nil, err
if err := proto.Unmarshal(value, &info); err != nil {
return err
}
ret[info.GetCollectionID()] = append(ret[info.GetCollectionID()], &info)
return nil
}
err := s.cli.WalkWithPrefix(PartitionLoadInfoPrefix, paginationSize, applyFn)
if err != nil {
return nil, err
}
return ret, nil
}
func (s Catalog) GetReplicas() ([]*querypb.Replica, error) {
_, values, err := s.cli.LoadWithPrefix(ReplicaPrefix)
if err != nil {
return nil, err
}
ret := make([]*querypb.Replica, 0, len(values))
for _, v := range values {
ret := make([]*querypb.Replica, 0)
applyFn := func(key []byte, value []byte) error {
info := querypb.Replica{}
if err := proto.Unmarshal([]byte(v), &info); err != nil {
return nil, err
if err := proto.Unmarshal(value, &info); err != nil {
return err
}
ret = append(ret, &info)
return nil
}
err := s.cli.WalkWithPrefix(ReplicaPrefix, paginationSize, applyFn)
if err != nil {
return nil, err
}
replicasV1, err := s.getReplicasFromV1()
@ -289,21 +297,23 @@ func (s Catalog) RemoveCollectionTarget(collectionID int64) error {
}
func (s Catalog) GetCollectionTargets() (map[int64]*querypb.CollectionTarget, error) {
keys, values, err := s.cli.LoadWithPrefix(CollectionTargetPrefix)
if err != nil {
return nil, err
}
ret := make(map[int64]*querypb.CollectionTarget)
for i, v := range values {
applyFn := func(key []byte, value []byte) error {
var decompressed bytes.Buffer
compressor.ZstdDecompress(bytes.NewReader([]byte(v)), io.Writer(&decompressed))
compressor.ZstdDecompress(bytes.NewReader(value), io.Writer(&decompressed))
target := &querypb.CollectionTarget{}
if err := proto.Unmarshal(decompressed.Bytes(), target); err != nil {
// recover target from meta is a optimize policy, skip when failure happens
log.Warn("failed to unmarshal collection target", zap.String("key", keys[i]), zap.Error(err))
continue
log.Warn("failed to unmarshal collection target", zap.String("key", string(key)), zap.Error(err))
return nil
}
ret[target.GetCollectionID()] = target
return nil
}
err := s.cli.WalkWithPrefix(CollectionTargetPrefix, paginationSize, applyFn)
if err != nil {
return nil, err
}
return ret, nil

View File

@ -231,7 +231,7 @@ func (suite *CatalogTestSuite) TestCollectionTarget() {
mockStore := mocks.NewMetaKv(suite.T())
mockErr := errors.New("failed to access etcd")
mockStore.EXPECT().MultiSave(mock.Anything).Return(mockErr)
mockStore.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, mockErr)
mockStore.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(mockErr)
suite.catalog.cli = mockStore
err = suite.catalog.SaveCollectionTargets(&querypb.CollectionTarget{})