mirror of https://github.com/milvus-io/milvus.git
enhance: [10kcp] Speed up meta recovery (#38298)
Increase the batchSize in WalkWithPrefix operations to 10000. issue: https://github.com/milvus-io/milvus/issues/37630 pr: https://github.com/milvus-io/milvus/pull/38285 --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/38312/head
parent
3d490aa158
commit
2fe6423552
|
@ -43,19 +43,25 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/util"
|
||||
"github.com/milvus-io/milvus/pkg/util/etcd"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
var paginationSize = 2000
|
||||
|
||||
type Catalog struct {
|
||||
MetaKv kv.MetaKv
|
||||
MetaKv kv.MetaKv
|
||||
|
||||
paginationSize int
|
||||
ChunkManagerRootPath string
|
||||
metaRootpath string
|
||||
}
|
||||
|
||||
func NewCatalog(MetaKv kv.MetaKv, chunkManagerRootPath string, metaRootpath string) *Catalog {
|
||||
return &Catalog{MetaKv: MetaKv, ChunkManagerRootPath: chunkManagerRootPath, metaRootpath: metaRootpath}
|
||||
return &Catalog{
|
||||
MetaKv: MetaKv,
|
||||
paginationSize: paramtable.Get().MetaStoreCfg.PaginationSize.GetAsInt(),
|
||||
ChunkManagerRootPath: chunkManagerRootPath,
|
||||
metaRootpath: metaRootpath,
|
||||
}
|
||||
}
|
||||
|
||||
func (kc *Catalog) ListSegments(ctx context.Context) ([]*datapb.SegmentInfo, error) {
|
||||
|
@ -121,7 +127,7 @@ func (kc *Catalog) listSegments() ([]*datapb.SegmentInfo, error) {
|
|||
return nil
|
||||
}
|
||||
|
||||
err := kc.MetaKv.WalkWithPrefix(SegmentPrefix+"/", paginationSize, applyFn)
|
||||
err := kc.MetaKv.WalkWithPrefix(SegmentPrefix+"/", kc.paginationSize, applyFn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -204,7 +210,7 @@ func (kc *Catalog) listBinlogs(binlogType storage.BinlogType) (map[typeutil.Uniq
|
|||
return nil
|
||||
}
|
||||
|
||||
err = kc.MetaKv.WalkWithPrefix(logPathPrefix, paginationSize, applyFn)
|
||||
err = kc.MetaKv.WalkWithPrefix(logPathPrefix, kc.paginationSize, applyFn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -483,7 +489,7 @@ func (kc *Catalog) ListChannelCheckpoint(ctx context.Context) (map[string]*msgpb
|
|||
return nil
|
||||
}
|
||||
|
||||
err := kc.MetaKv.WalkWithPrefix(ChannelCheckpointPrefix, paginationSize, applyFn)
|
||||
err := kc.MetaKv.WalkWithPrefix(ChannelCheckpointPrefix, kc.paginationSize, applyFn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -568,7 +574,7 @@ func (kc *Catalog) ListIndexes(ctx context.Context) ([]*model.Index, error) {
|
|||
return nil
|
||||
}
|
||||
|
||||
err := kc.MetaKv.WalkWithPrefix(util.FieldIndexPrefix, paginationSize, applyFn)
|
||||
err := kc.MetaKv.WalkWithPrefix(util.FieldIndexPrefix, kc.paginationSize, applyFn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -644,7 +650,7 @@ func (kc *Catalog) ListSegmentIndexes(ctx context.Context) ([]*model.SegmentInde
|
|||
return nil
|
||||
}
|
||||
|
||||
err := kc.MetaKv.WalkWithPrefix(util.SegmentIndexPrefix, paginationSize, applyFn)
|
||||
err := kc.MetaKv.WalkWithPrefix(util.SegmentIndexPrefix, kc.paginationSize, applyFn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -698,7 +704,7 @@ func (kc *Catalog) ListImportJobs() ([]*datapb.ImportJob, error) {
|
|||
return nil
|
||||
}
|
||||
|
||||
err := kc.MetaKv.WalkWithPrefix(ImportJobPrefix, paginationSize, applyFn)
|
||||
err := kc.MetaKv.WalkWithPrefix(ImportJobPrefix, kc.paginationSize, applyFn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -732,7 +738,7 @@ func (kc *Catalog) ListPreImportTasks() ([]*datapb.PreImportTask, error) {
|
|||
return nil
|
||||
}
|
||||
|
||||
err := kc.MetaKv.WalkWithPrefix(PreImportTaskPrefix, paginationSize, applyFn)
|
||||
err := kc.MetaKv.WalkWithPrefix(PreImportTaskPrefix, kc.paginationSize, applyFn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -766,7 +772,7 @@ func (kc *Catalog) ListImportTasks() ([]*datapb.ImportTaskV2, error) {
|
|||
return nil
|
||||
}
|
||||
|
||||
err := kc.MetaKv.WalkWithPrefix(ImportTaskPrefix, paginationSize, applyFn)
|
||||
err := kc.MetaKv.WalkWithPrefix(ImportTaskPrefix, kc.paginationSize, applyFn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -806,7 +812,7 @@ func (kc *Catalog) ListCompactionTask(ctx context.Context) ([]*datapb.Compaction
|
|||
return nil
|
||||
}
|
||||
|
||||
err := kc.MetaKv.WalkWithPrefix(CompactionTaskPrefix, paginationSize, applyFn)
|
||||
err := kc.MetaKv.WalkWithPrefix(CompactionTaskPrefix, kc.paginationSize, applyFn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -845,7 +851,7 @@ func (kc *Catalog) ListAnalyzeTasks(ctx context.Context) ([]*indexpb.AnalyzeTask
|
|||
return nil
|
||||
}
|
||||
|
||||
err := kc.MetaKv.WalkWithPrefix(AnalyzeTaskPrefix, paginationSize, applyFn)
|
||||
err := kc.MetaKv.WalkWithPrefix(AnalyzeTaskPrefix, kc.paginationSize, applyFn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -885,7 +891,7 @@ func (kc *Catalog) ListPartitionStatsInfos(ctx context.Context) ([]*datapb.Parti
|
|||
return nil
|
||||
}
|
||||
|
||||
err := kc.MetaKv.WalkWithPrefix(PartitionStatsInfoPrefix, paginationSize, applyFn)
|
||||
err := kc.MetaKv.WalkWithPrefix(PartitionStatsInfoPrefix, kc.paginationSize, applyFn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -16,10 +16,9 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/pkg/kv"
|
||||
"github.com/milvus-io/milvus/pkg/util/compressor"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
)
|
||||
|
||||
var paginationSize = 2000
|
||||
|
||||
var ErrInvalidKey = errors.New("invalid load info key")
|
||||
|
||||
const (
|
||||
|
@ -35,12 +34,14 @@ const (
|
|||
)
|
||||
|
||||
type Catalog struct {
|
||||
cli kv.MetaKv
|
||||
cli kv.MetaKv
|
||||
paginationSize int
|
||||
}
|
||||
|
||||
func NewCatalog(cli kv.MetaKv) Catalog {
|
||||
return Catalog{
|
||||
cli: cli,
|
||||
cli: cli,
|
||||
paginationSize: paramtable.Get().MetaStoreCfg.PaginationSize.GetAsInt(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -116,7 +117,7 @@ func (s Catalog) GetCollections() ([]*querypb.CollectionLoadInfo, error) {
|
|||
return nil
|
||||
}
|
||||
|
||||
err := s.cli.WalkWithPrefix(CollectionLoadInfoPrefix, paginationSize, applyFn)
|
||||
err := s.cli.WalkWithPrefix(CollectionLoadInfoPrefix, s.paginationSize, applyFn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -135,7 +136,7 @@ func (s Catalog) GetPartitions() (map[int64][]*querypb.PartitionLoadInfo, error)
|
|||
return nil
|
||||
}
|
||||
|
||||
err := s.cli.WalkWithPrefix(PartitionLoadInfoPrefix, paginationSize, applyFn)
|
||||
err := s.cli.WalkWithPrefix(PartitionLoadInfoPrefix, s.paginationSize, applyFn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -154,7 +155,7 @@ func (s Catalog) GetReplicas() ([]*querypb.Replica, error) {
|
|||
return nil
|
||||
}
|
||||
|
||||
err := s.cli.WalkWithPrefix(ReplicaPrefix, paginationSize, applyFn)
|
||||
err := s.cli.WalkWithPrefix(ReplicaPrefix, s.paginationSize, applyFn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -311,7 +312,7 @@ func (s Catalog) GetCollectionTargets() (map[int64]*querypb.CollectionTarget, er
|
|||
return nil
|
||||
}
|
||||
|
||||
err := s.cli.WalkWithPrefix(CollectionTargetPrefix, paginationSize, applyFn)
|
||||
err := s.cli.WalkWithPrefix(CollectionTargetPrefix, s.paginationSize, applyFn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -40,11 +40,8 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
var (
|
||||
// SuffixSnapshotTombstone special value for tombstone mark
|
||||
SuffixSnapshotTombstone = []byte{0xE2, 0x9B, 0xBC}
|
||||
PaginationSize = 5000
|
||||
)
|
||||
// SuffixSnapshotTombstone special value for tombstone mark
|
||||
var SuffixSnapshotTombstone = []byte{0xE2, 0x9B, 0xBC}
|
||||
|
||||
// IsTombstone used in migration tool also.
|
||||
func IsTombstone(value string) bool {
|
||||
|
@ -82,6 +79,8 @@ type SuffixSnapshot struct {
|
|||
// snapshotLen pre calculated offset when parsing snapshot key
|
||||
snapshotLen int
|
||||
|
||||
paginationSize int
|
||||
|
||||
closeGC chan struct{}
|
||||
}
|
||||
|
||||
|
@ -116,6 +115,7 @@ func NewSuffixSnapshot(metaKV kv.MetaKv, sep, root, snapshot string) (*SuffixSna
|
|||
snapshotLen: snapshotLen,
|
||||
rootPrefix: root,
|
||||
rootLen: rootLen,
|
||||
paginationSize: paramtable.Get().MetaStoreCfg.PaginationSize.GetAsInt(),
|
||||
closeGC: make(chan struct{}, 1),
|
||||
}
|
||||
go ss.startBackgroundGC()
|
||||
|
@ -447,7 +447,7 @@ func (ss *SuffixSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]s
|
|||
return nil
|
||||
}
|
||||
|
||||
err := ss.MetaKv.WalkWithPrefix(key, PaginationSize, applyFn)
|
||||
err := ss.MetaKv.WalkWithPrefix(key, ss.paginationSize, applyFn)
|
||||
return fks, fvs, err
|
||||
}
|
||||
ss.Lock()
|
||||
|
@ -470,7 +470,7 @@ func (ss *SuffixSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]s
|
|||
resultValues = append(resultValues, value)
|
||||
}
|
||||
|
||||
err := ss.MetaKv.WalkWithPrefix(prefix, PaginationSize, func(k []byte, v []byte) error {
|
||||
err := ss.MetaKv.WalkWithPrefix(prefix, ss.paginationSize, func(k []byte, v []byte) error {
|
||||
sKey := string(k)
|
||||
sValue := string(v)
|
||||
|
||||
|
@ -640,7 +640,7 @@ func (ss *SuffixSnapshot) removeExpiredKvs(now time.Time) error {
|
|||
}
|
||||
|
||||
// Walk through all keys with the snapshot prefix
|
||||
err := ss.MetaKv.WalkWithPrefix(ss.snapshotPrefix, PaginationSize, func(k []byte, v []byte) error {
|
||||
err := ss.MetaKv.WalkWithPrefix(ss.snapshotPrefix, ss.paginationSize, func(k []byte, v []byte) error {
|
||||
key := ss.hideRootPrefix(string(k))
|
||||
ts, ok := ss.isTSKey(key)
|
||||
if !ok {
|
||||
|
|
|
@ -458,6 +458,7 @@ type MetaStoreConfig struct {
|
|||
MetaStoreType ParamItem `refreshable:"false"`
|
||||
SnapshotTTLSeconds ParamItem `refreshable:"true"`
|
||||
SnapshotReserveTimeSeconds ParamItem `refreshable:"true"`
|
||||
PaginationSize ParamItem `refreshable:"true"`
|
||||
}
|
||||
|
||||
func (p *MetaStoreConfig) Init(base *BaseTable) {
|
||||
|
@ -488,6 +489,14 @@ func (p *MetaStoreConfig) Init(base *BaseTable) {
|
|||
}
|
||||
p.SnapshotReserveTimeSeconds.Init(base.mgr)
|
||||
|
||||
p.PaginationSize = ParamItem{
|
||||
Key: "metastore.paginationSize",
|
||||
Version: "2.5.1",
|
||||
DefaultValue: "10000",
|
||||
Doc: `limits the number of results to return from metastore.`,
|
||||
}
|
||||
p.PaginationSize.Init(base.mgr)
|
||||
|
||||
// TODO: The initialization operation of metadata storage is called in the initialization phase of every node.
|
||||
// There should be a single initialization operation for meta store, then move the metrics registration to there.
|
||||
metrics.RegisterMetaType(p.MetaStoreType.GetValue())
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/config"
|
||||
"github.com/milvus-io/milvus/pkg/util"
|
||||
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
|
||||
)
|
||||
|
||||
|
@ -203,4 +204,13 @@ func TestServiceParam(t *testing.T) {
|
|||
|
||||
t.Logf("Minio rootpath = %s", Params.RootPath.GetValue())
|
||||
})
|
||||
|
||||
t.Run("test metastore config", func(t *testing.T) {
|
||||
Params := &SParams.MetaStoreCfg
|
||||
|
||||
assert.Equal(t, util.MetaStoreTypeEtcd, Params.MetaStoreType.GetValue())
|
||||
assert.Equal(t, 86400*time.Second, Params.SnapshotTTLSeconds.GetAsDuration(time.Second))
|
||||
assert.Equal(t, 3600*time.Second, Params.SnapshotReserveTimeSeconds.GetAsDuration(time.Second))
|
||||
assert.Equal(t, 10000, Params.PaginationSize.GetAsInt())
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue