enhance: add mmap migration tool (#30909)

issue: #30908

---------

Signed-off-by: sunby <sunbingyi1992@gmail.com>
pull/31600/head
Bingyi Sun 2024-03-25 15:51:09 +08:00 committed by GitHub
parent 8e661f791a
commit 0ac9bb4a9c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 287 additions and 0 deletions

View File

@ -502,3 +502,11 @@ generate-mockery-pkg:
generate-mockery: generate-mockery-types generate-mockery-kv generate-mockery-rootcoord generate-mockery-proxy generate-mockery-querycoord generate-mockery-querynode generate-mockery-datacoord generate-mockery-pkg
MMAP_MIGRATION_PATH = $(PWD)/cmd/tools/migration/mmap/tool
mmap-migration:
@echo "Building migration tool ..."
@source $(PWD)/scripts/setenv.sh && \
mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && \
GO111MODULE=on $(GO) build -ldflags="-r $${RPATH} -X '$(OBJPREFIX).BuildTags=$(BUILD_TAGS)' -X '$(OBJPREFIX).BuildTime=$(BUILD_TIME)' -X '$(OBJPREFIX).GitCommit=$(GIT_COMMIT)' -X '$(OBJPREFIX).GoVersion=$(GO_VERSION)'" \
-tags dynamic -o $(INSTALL_PATH)/mmap-migration $(MMAP_MIGRATION_PATH)/main.go 1>/dev/null

View File

@ -0,0 +1,109 @@
package mmap
import (
"context"
"fmt"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/rootcoord"
"github.com/milvus-io/milvus/internal/tso"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
)
// In Milvus 2.3.x, querynode.MmapDirPath is used to enable mmap and save mmap files.
// In Milvus 2.4.x, mmap is enabled by setting collection properties and altering index.
// querynode.MmapDirPath is only used to save mmap files.
// Therefore, we need to read configs from 2.3.x and modify meta data if necessary.
type MmapMigration struct {
rootcoordMeta rootcoord.IMetaTable
tsoAllocator tso.Allocator
datacoordCatalog metastore.DataCoordCatalog
}
func (m *MmapMigration) Migrate(ctx context.Context) {
m.MigrateRootCoordCollection(ctx)
m.MigrateIndexCoordCollection(ctx)
}
func updateOrAddMmapKey(kv []*commonpb.KeyValuePair, key, value string) []*commonpb.KeyValuePair {
for _, pair := range kv {
if pair.Key == key {
pair.Value = value
return kv
}
}
return append(kv, &commonpb.KeyValuePair{Key: key, Value: value})
}
func (m *MmapMigration) MigrateRootCoordCollection(ctx context.Context) {
ts, err := m.tsoAllocator.GenerateTSO(1)
if err != nil {
panic(err)
}
db2Colls := m.rootcoordMeta.ListAllAvailCollections(ctx)
for did, collIds := range db2Colls {
db, err := m.rootcoordMeta.GetDatabaseByID(ctx, did, ts)
if err != nil {
panic(err)
}
for _, cid := range collIds {
collection, err := m.rootcoordMeta.GetCollectionByID(ctx, db.Name, cid, ts, false)
if err != nil {
panic(err)
}
newColl := collection.Clone()
newColl.Properties = updateOrAddMmapKey(newColl.Properties, common.MmapEnabledKey, "true")
fmt.Printf("migrate collection %v, %s\n", collection.CollectionID, collection.Name)
if err := m.rootcoordMeta.AlterCollection(ctx, collection, newColl, ts); err != nil {
panic(err)
}
}
}
}
func (m *MmapMigration) MigrateIndexCoordCollection(ctx context.Context) {
// load field indexes
fieldIndexes, err := m.datacoordCatalog.ListIndexes(ctx)
if err != nil {
panic(err)
}
getIndexType := func(indexParams []*commonpb.KeyValuePair) string {
for _, param := range indexParams {
if param.Key == common.IndexTypeKey {
return param.Value
}
}
return "invalid"
}
alteredIndexes := make([]*model.Index, 0)
for _, index := range fieldIndexes {
if !indexparamcheck.IsMmapSupported(getIndexType(index.IndexParams)) {
continue
}
fmt.Printf("migrate index, collection:%v, indexId: %v, indexName: %s\n", index.CollectionID, index.IndexID, index.IndexName)
newIndex := model.CloneIndex(index)
newIndex.UserIndexParams = updateOrAddMmapKey(newIndex.UserIndexParams, common.MmapEnabledKey, "true")
newIndex.IndexParams = updateOrAddMmapKey(newIndex.IndexParams, common.MmapEnabledKey, "true")
alteredIndexes = append(alteredIndexes, newIndex)
}
if err := m.datacoordCatalog.AlterIndexes(ctx, alteredIndexes); err != nil {
panic(err)
}
}
func NewMmapMigration(rootcoordMeta rootcoord.IMetaTable, tsoAllocator tso.Allocator, datacoordCatalog metastore.DataCoordCatalog) *MmapMigration {
return &MmapMigration{
rootcoordMeta: rootcoordMeta,
tsoAllocator: tsoAllocator,
datacoordCatalog: datacoordCatalog,
}
}

View File

@ -0,0 +1,170 @@
package main
import (
"context"
"flag"
"fmt"
"os"
"time"
"github.com/milvus-io/milvus/cmd/tools/migration/mmap"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
kv_tikv "github.com/milvus-io/milvus/internal/kv/tikv"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
kvmetestore "github.com/milvus-io/milvus/internal/metastore/kv/rootcoord"
"github.com/milvus-io/milvus/internal/rootcoord"
"github.com/milvus-io/milvus/internal/tso"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tikv"
)
func main() {
configPtr := flag.String("config", "", "Path to the configuration file")
flag.Parse()
if *configPtr == "" {
log.Error("Config file path is required")
flag.Usage()
os.Exit(1)
}
fmt.Printf("Using config file: %s\n", *configPtr)
prepareParams(*configPtr)
if paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue() == "" {
fmt.Println("mmap is not enabled")
return
}
fmt.Printf("MmapDirPath: %s\n", paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue())
allocator := prepareTsoAllocator()
rootCoordMeta := prepareRootCoordMeta(context.Background(), allocator)
dataCoordCatalog := prepareDataCoordCatalog()
m := mmap.NewMmapMigration(rootCoordMeta, allocator, dataCoordCatalog)
m.Migrate(context.Background())
}
func prepareParams(yamlFile string) *paramtable.ComponentParam {
paramtable.Get().Init(paramtable.NewBaseTableFromYamlOnly(yamlFile))
return paramtable.Get()
}
func prepareTsoAllocator() tso.Allocator {
var tsoKV kv.TxnKV
var kvPath string
if paramtable.Get().MetaStoreCfg.MetaStoreType.GetValue() == util.MetaStoreTypeTiKV {
tikvCli, err := tikv.GetTiKVClient(&paramtable.Get().TiKVCfg)
if err != nil {
panic(err)
}
kvPath = paramtable.Get().TiKVCfg.KvRootPath.GetValue()
tsoKV = tsoutil.NewTSOTiKVBase(tikvCli, kvPath, "gid")
} else {
etcdConfig := &paramtable.Get().EtcdCfg
etcdCli, err := etcd.CreateEtcdClient(
etcdConfig.UseEmbedEtcd.GetAsBool(),
etcdConfig.EtcdEnableAuth.GetAsBool(),
etcdConfig.EtcdAuthUserName.GetValue(),
etcdConfig.EtcdAuthPassword.GetValue(),
etcdConfig.EtcdUseSSL.GetAsBool(),
etcdConfig.Endpoints.GetAsStrings(),
etcdConfig.EtcdTLSCert.GetValue(),
etcdConfig.EtcdTLSKey.GetValue(),
etcdConfig.EtcdTLSCACert.GetValue(),
etcdConfig.EtcdTLSMinVersion.GetValue())
if err != nil {
panic(err)
}
kvPath = paramtable.Get().EtcdCfg.KvRootPath.GetValue()
tsoKV = tsoutil.NewTSOKVBase(etcdCli, kvPath, "gid")
}
tsoAllocator := tso.NewGlobalTSOAllocator("idTimestamp", tsoKV)
if err := tsoAllocator.Initialize(); err != nil {
panic(err)
}
return tsoAllocator
}
func metaKVCreator() (kv.MetaKv, error) {
if paramtable.Get().MetaStoreCfg.MetaStoreType.GetValue() == util.MetaStoreTypeTiKV {
tikvCli, err := tikv.GetTiKVClient(&paramtable.Get().TiKVCfg)
if err != nil {
panic(err)
}
return kv_tikv.NewTiKV(tikvCli, paramtable.Get().TiKVCfg.MetaRootPath.GetValue(),
kv_tikv.WithRequestTimeout(paramtable.Get().ServiceParam.TiKVCfg.RequestTimeout.GetAsDuration(time.Millisecond))), nil
}
etcdConfig := &paramtable.Get().EtcdCfg
etcdCli, err := etcd.CreateEtcdClient(
etcdConfig.UseEmbedEtcd.GetAsBool(),
etcdConfig.EtcdEnableAuth.GetAsBool(),
etcdConfig.EtcdAuthUserName.GetValue(),
etcdConfig.EtcdAuthPassword.GetValue(),
etcdConfig.EtcdUseSSL.GetAsBool(),
etcdConfig.Endpoints.GetAsStrings(),
etcdConfig.EtcdTLSCert.GetValue(),
etcdConfig.EtcdTLSKey.GetValue(),
etcdConfig.EtcdTLSCACert.GetValue(),
etcdConfig.EtcdTLSMinVersion.GetValue())
if err != nil {
panic(err)
}
return etcdkv.NewEtcdKV(etcdCli, paramtable.Get().EtcdCfg.MetaRootPath.GetValue(),
etcdkv.WithRequestTimeout(paramtable.Get().ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond))), nil
}
func prepareRootCoordMeta(ctx context.Context, allocator tso.Allocator) rootcoord.IMetaTable {
var catalog metastore.RootCoordCatalog
var err error
switch paramtable.Get().MetaStoreCfg.MetaStoreType.GetValue() {
case util.MetaStoreTypeEtcd:
var metaKV kv.MetaKv
var ss *kvmetestore.SuffixSnapshot
var err error
if metaKV, err = metaKVCreator(); err != nil {
panic(err)
}
if ss, err = kvmetestore.NewSuffixSnapshot(metaKV, kvmetestore.SnapshotsSep, paramtable.Get().EtcdCfg.MetaRootPath.GetValue(), kvmetestore.SnapshotPrefix); err != nil {
panic(err)
}
catalog = &kvmetestore.Catalog{Txn: metaKV, Snapshot: ss}
case util.MetaStoreTypeTiKV:
log.Info("Using tikv as meta storage.")
var metaKV kv.MetaKv
var ss *kvmetestore.SuffixSnapshot
var err error
if metaKV, err = metaKVCreator(); err != nil {
panic(err)
}
if ss, err = kvmetestore.NewSuffixSnapshot(metaKV, kvmetestore.SnapshotsSep, paramtable.Get().TiKVCfg.MetaRootPath.GetValue(), kvmetestore.SnapshotPrefix); err != nil {
panic(err)
}
catalog = &kvmetestore.Catalog{Txn: metaKV, Snapshot: ss}
default:
panic(fmt.Sprintf("MetaStoreType %s not supported", paramtable.Get().MetaStoreCfg.MetaStoreType.GetValue()))
}
var meta rootcoord.IMetaTable
if meta, err = rootcoord.NewMetaTable(ctx, catalog, allocator); err != nil {
panic(err)
}
return meta
}
func prepareDataCoordCatalog() metastore.DataCoordCatalog {
kv, err := metaKVCreator()
if err != nil {
panic(err)
}
return datacoord.NewCatalog(kv, "", "")
}