From a8a074162f448596a1d53c2d0537d5b05956e6e0 Mon Sep 17 00:00:00 2001 From: Jiquan Long Date: Wed, 12 Oct 2022 11:37:23 +0800 Subject: [PATCH] Add meta migration tool (#19709) Signed-off-by: longjiquan Signed-off-by: longjiquan --- Makefile | 6 + cmd/tools/migration/backend/backend.go | 39 ++ cmd/tools/migration/backend/backup_header.go | 35 ++ cmd/tools/migration/backend/backup_restore.go | 143 ++++++ .../migration/backend/backup_restore_test.go | 31 ++ cmd/tools/migration/backend/etcd.go | 29 ++ cmd/tools/migration/backend/etcd210.go | 417 ++++++++++++++++++ cmd/tools/migration/backend/etcd220.go | 123 ++++++ cmd/tools/migration/command/backup.go | 20 + cmd/tools/migration/command/config.go | 24 + cmd/tools/migration/command/help.go | 12 + cmd/tools/migration/command/main.go | 35 ++ cmd/tools/migration/command/rollback.go | 20 + cmd/tools/migration/command/run.go | 24 + cmd/tools/migration/configs/config.go | 98 ++++ cmd/tools/migration/console/console.go | 57 +++ cmd/tools/migration/console/console_test.go | 21 + cmd/tools/migration/example.yaml | 44 ++ cmd/tools/migration/legacy/constant.go | 11 + cmd/tools/migration/legacy/legacy.proto | 35 ++ .../migration/legacy/legacypb/legacy.pb.go | 280 ++++++++++++ cmd/tools/migration/legacy/util.go | 19 + cmd/tools/migration/main.go | 11 + cmd/tools/migration/meta/210_to_220.go | 213 +++++++++ cmd/tools/migration/meta/meta.go | 17 + cmd/tools/migration/meta/meta210.go | 312 +++++++++++++ cmd/tools/migration/meta/meta220.go | 248 +++++++++++ cmd/tools/migration/migration/210_to_220.go | 16 + cmd/tools/migration/migration/constant.go | 18 + cmd/tools/migration/migration/migration.go | 21 + cmd/tools/migration/migration/migrator.go | 32 ++ cmd/tools/migration/migration/runner.go | 214 +++++++++ cmd/tools/migration/utils/util.go | 49 ++ cmd/tools/migration/utils/util_test.go | 40 ++ cmd/tools/migration/versions/version.go | 32 ++ cmd/tools/migration/versions/version_test.go | 77 ++++ go.mod | 21 +- go.sum | 11 +- .../metastore/kv/indexcoord/kv_catalog.go | 16 +- internal/metastore/kv/rootcoord/kv_catalog.go | 62 +-- .../metastore/kv/rootcoord/kv_catalog_test.go | 4 +- .../kv/rootcoord/rootcoord_constant.go | 7 +- .../metastore/kv/rootcoord/suffix_snapshot.go | 22 +- internal/metastore/model/collection.go | 65 ++- internal/rootcoord/constrant.go | 2 - internal/rootcoord/meta_table.go | 6 - internal/rootcoord/root_coord.go | 2 +- internal/util/paramtable/base_table.go | 8 + internal/util/paramtable/base_table_test.go | 8 + internal/util/paramtable/service_param.go | 4 + internal/util/sessionutil/session_util.go | 11 + .../util/sessionutil/session_util_test.go | 17 + scripts/proto_gen_go.sh | 5 + 53 files changed, 3008 insertions(+), 86 deletions(-) create mode 100644 cmd/tools/migration/backend/backend.go create mode 100644 cmd/tools/migration/backend/backup_header.go create mode 100644 cmd/tools/migration/backend/backup_restore.go create mode 100644 cmd/tools/migration/backend/backup_restore_test.go create mode 100644 cmd/tools/migration/backend/etcd.go create mode 100644 cmd/tools/migration/backend/etcd210.go create mode 100644 cmd/tools/migration/backend/etcd220.go create mode 100644 cmd/tools/migration/command/backup.go create mode 100644 cmd/tools/migration/command/config.go create mode 100644 cmd/tools/migration/command/help.go create mode 100644 cmd/tools/migration/command/main.go create mode 100644 cmd/tools/migration/command/rollback.go create mode 100644 cmd/tools/migration/command/run.go create mode 100644 cmd/tools/migration/configs/config.go create mode 100644 cmd/tools/migration/console/console.go create mode 100644 cmd/tools/migration/console/console_test.go create mode 100644 cmd/tools/migration/example.yaml create mode 100644 cmd/tools/migration/legacy/constant.go create mode 100644 cmd/tools/migration/legacy/legacy.proto create mode 100644 cmd/tools/migration/legacy/legacypb/legacy.pb.go create mode 100644 cmd/tools/migration/legacy/util.go create mode 100644 cmd/tools/migration/main.go create mode 100644 cmd/tools/migration/meta/210_to_220.go create mode 100644 cmd/tools/migration/meta/meta.go create mode 100644 cmd/tools/migration/meta/meta210.go create mode 100644 cmd/tools/migration/meta/meta220.go create mode 100644 cmd/tools/migration/migration/210_to_220.go create mode 100644 cmd/tools/migration/migration/constant.go create mode 100644 cmd/tools/migration/migration/migration.go create mode 100644 cmd/tools/migration/migration/migrator.go create mode 100644 cmd/tools/migration/migration/runner.go create mode 100644 cmd/tools/migration/utils/util.go create mode 100644 cmd/tools/migration/utils/util_test.go create mode 100644 cmd/tools/migration/versions/version.go create mode 100644 cmd/tools/migration/versions/version_test.go diff --git a/Makefile b/Makefile index 496d0ac060..54aa9e41ec 100644 --- a/Makefile +++ b/Makefile @@ -79,6 +79,12 @@ binlog: @source $(PWD)/scripts/setenv.sh && \ mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/binlog $(PWD)/cmd/tools/binlog/main.go 1>/dev/null +MIGRATION_PATH = $(PWD)/cmd/tools/migration +migration: + @echo "Building migration tool ..." + @source $(PWD)/scripts/setenv.sh && \ + mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/migration $(MIGRATION_PATH)/main.go 1>/dev/null + BUILD_TAGS = $(shell git describe --tags --always --dirty="-dev") BUILD_TIME = $(shell date -u) GIT_COMMIT = $(shell git rev-parse --short HEAD) diff --git a/cmd/tools/migration/backend/backend.go b/cmd/tools/migration/backend/backend.go new file mode 100644 index 0000000000..316166173c --- /dev/null +++ b/cmd/tools/migration/backend/backend.go @@ -0,0 +1,39 @@ +package backend + +import ( + "fmt" + + "github.com/blang/semver/v4" + + "github.com/milvus-io/milvus/cmd/tools/migration/configs" + + "github.com/milvus-io/milvus/cmd/tools/migration/versions" + + "github.com/milvus-io/milvus/cmd/tools/migration/meta" + "github.com/milvus-io/milvus/internal/util" +) + +type Backend interface { + Load() (*meta.Meta, error) + Save(meta *meta.Meta) error + Clean() error + Backup(meta *meta.Meta, backupFile string) error + Restore(backupFile string) error +} + +func NewBackend(cfg *configs.MilvusConfig, version string) (Backend, error) { + switch cfg.MetaStoreCfg.MetaStoreType { + case util.MetaStoreTypeMysql: + return nil, fmt.Errorf("%s is not supported now", cfg.MetaStoreCfg.MetaStoreType) + } + v, err := semver.Parse(version) + if err != nil { + return nil, err + } + if versions.Range21x(v) { + return newEtcd210(cfg) + } else if versions.Range22x(v) { + return newEtcd220(cfg) + } + return nil, fmt.Errorf("version not supported: %s", version) +} diff --git a/cmd/tools/migration/backend/backup_header.go b/cmd/tools/migration/backend/backup_header.go new file mode 100644 index 0000000000..263613b549 --- /dev/null +++ b/cmd/tools/migration/backend/backup_header.go @@ -0,0 +1,35 @@ +package backend + +import "github.com/golang/protobuf/proto" + +type BackupHeaderVersion int32 + +const ( + BackupHeaderVersionV1 BackupHeaderVersion = iota +) + +// BackupHeader stores etcd backup header information +type BackupHeader struct { + // Version number for backup format + Version BackupHeaderVersion `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"` + // instance name, as rootPath for key prefix + Instance string `protobuf:"bytes,2,opt,name=instance,proto3" json:"instance,omitempty"` + // MetaPath used in keys + MetaPath string `protobuf:"bytes,3,opt,name=meta_path,proto3" json:"meta_path,omitempty"` + // Entries record number of key-value in backup + Entries int64 `protobuf:"varint,4,opt,name=entries,proto3" json:"entries,omitempty"` + // Component is the backup target + Component string `protobuf:"bytes,5,opt,name=component,proto3" json:"component,omitempty"` + // Extra property reserved + Extra []byte `protobuf:"bytes,6,opt,name=extra,proto3" json:"-"` +} + +func (v *BackupHeader) Reset() { + *v = BackupHeader{} +} + +func (v *BackupHeader) String() string { + return proto.CompactTextString(v) +} + +func (v *BackupHeader) ProtoMessage() {} diff --git a/cmd/tools/migration/backend/backup_restore.go b/cmd/tools/migration/backend/backup_restore.go new file mode 100644 index 0000000000..f9b2686ef0 --- /dev/null +++ b/cmd/tools/migration/backend/backup_restore.go @@ -0,0 +1,143 @@ +package backend + +import ( + "encoding/binary" + "fmt" + "io" + + "github.com/golang/protobuf/proto" + "github.com/milvus-io/milvus/api/commonpb" +) + +type BackupFile []byte + +func (f *BackupFile) Reset() { + *f = nil +} + +func (f *BackupFile) writeBytes(bs []byte) { + *f = append(*f, bs...) +} + +func (f *BackupFile) writeHeaderLength(l uint64) { + lengthBytes := make([]byte, 8) + binary.LittleEndian.PutUint64(lengthBytes, l) + f.writeBytes(lengthBytes) +} + +func (f *BackupFile) writeHeaderBytes(header []byte) { + f.writeBytes(header) +} + +func (f *BackupFile) WriteHeader(header *BackupHeader) error { + f.Reset() + marshaledHeader, err := proto.Marshal(header) + if err != nil { + return err + } + l := len(marshaledHeader) + f.writeHeaderLength(uint64(l)) + f.writeHeaderBytes(marshaledHeader) + return nil +} + +func (f *BackupFile) writeEntryLength(l uint64) { + f.writeHeaderLength(l) +} + +func (f *BackupFile) writeEntryBytes(entry []byte) { + f.writeBytes(entry) +} + +func (f *BackupFile) WriteEntry(k, v string) error { + entry := &commonpb.KeyDataPair{ + Key: k, + Data: []byte(v), + } + marshaledEntry, err := proto.Marshal(entry) + if err != nil { + return err + } + l := len(marshaledEntry) + f.writeEntryLength(uint64(l)) + f.writeEntryBytes(marshaledEntry) + return nil +} + +func (f *BackupFile) ReadHeader() (header *BackupHeader, headerLength uint64, err error) { + if len(*f) < 8 { + return nil, 0, fmt.Errorf("invalid backup file, cannot read header length") + } + headerLength = binary.LittleEndian.Uint64((*f)[:8]) + if uint64(len(*f)) < 8+headerLength { + return nil, 0, fmt.Errorf("invalid backup file, cannot read header") + } + header = &BackupHeader{} + if err := proto.Unmarshal((*f)[8:headerLength+8], header); err != nil { + return nil, 0, fmt.Errorf("invalid backup file, cannot read header: %s", err.Error()) + } + return header, headerLength, nil +} + +func (f *BackupFile) ReadEntryFromPos(pos uint64) (entryLength uint64, entry *commonpb.KeyDataPair, err error) { + if pos == uint64(len(*f)) { + return 0, nil, io.EOF + } + if uint64(len(*f)) < pos+8 { + return 0, nil, fmt.Errorf("invalid backup file, cannot read entry length") + } + entryLength = binary.LittleEndian.Uint64((*f)[pos : pos+8]) + if uint64(len(*f)) < pos+8+entryLength { + return 0, nil, fmt.Errorf("invalid backup file, cannot read entry") + } + entry = &commonpb.KeyDataPair{} + if err := proto.Unmarshal((*f)[pos+8:pos+8+entryLength], entry); err != nil { + return 0, nil, fmt.Errorf("invalid backup file, cannot read entry: %s", err.Error()) + } + return entryLength, entry, nil +} + +func (f *BackupFile) DeSerialize() (header *BackupHeader, kvs map[string]string, err error) { + pos := uint64(0) + header, headerLength, err := f.ReadHeader() + if err != nil { + return nil, nil, err + } + pos += 8 + headerLength + kvs = make(map[string]string) + for { + entryLength, entry, err := f.ReadEntryFromPos(pos) + if err == io.EOF { + return header, kvs, nil + } + if err != nil { + return nil, nil, err + } + kvs[entry.GetKey()] = string(entry.GetData()) + pos += 8 + entryLength + } +} + +type BackupCodec struct{} + +func (c *BackupCodec) Serialize(header *BackupHeader, kvs map[string]string) (BackupFile, error) { + file := make(BackupFile, 0) + header.Entries = int64(len(kvs)) + if err := file.WriteHeader(header); err != nil { + return nil, err + } + for k, v := range kvs { + if err := file.WriteEntry(k, v); err != nil { + return nil, err + } + } + return file, nil +} + +func (c *BackupCodec) DeSerialize(file BackupFile) (header *BackupHeader, kvs map[string]string, err error) { + return file.DeSerialize() +} + +func NewBackupCodec() *BackupCodec { + return &BackupCodec{} +} diff --git a/cmd/tools/migration/backend/backup_restore_test.go b/cmd/tools/migration/backend/backup_restore_test.go new file mode 100644 index 0000000000..847f17ba1b --- /dev/null +++ b/cmd/tools/migration/backend/backup_restore_test.go @@ -0,0 +1,31 @@ +package backend + +import ( + "reflect" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestBackupCodec_Serialize(t *testing.T) { + header := &BackupHeader{ + Version: BackupHeaderVersionV1, + Instance: "/by-dev", + MetaPath: "meta", + Entries: 0, + Component: "", + Extra: nil, + } + kvs := map[string]string{ + "1": "1", + "2": "2", + "3": "3", + } + codec := &BackupCodec{} + file, err := codec.Serialize(header, kvs) + assert.NoError(t, err) + gotHeader, gotEntries, err := codec.DeSerialize(file) + assert.NoError(t, err) + assert.True(t, reflect.DeepEqual(header, gotHeader)) + assert.True(t, reflect.DeepEqual(kvs, gotEntries)) +} diff --git a/cmd/tools/migration/backend/etcd.go b/cmd/tools/migration/backend/etcd.go new file mode 100644 index 0000000000..8f6aeaeed0 --- /dev/null +++ b/cmd/tools/migration/backend/etcd.go @@ -0,0 +1,29 @@ +package backend + +import ( + "github.com/milvus-io/milvus/cmd/tools/migration/configs" + "github.com/milvus-io/milvus/internal/kv" + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/util/etcd" + clientv3 "go.etcd.io/etcd/client/v3" +) + +type etcdBasedBackend struct { + cfg *configs.MilvusConfig + txn kv.MetaKv + etcdCli *clientv3.Client +} + +func (b etcdBasedBackend) CleanWithPrefix(prefix string) error { + return b.txn.RemoveWithPrefix(prefix) +} + +func newEtcdBasedBackend(cfg *configs.MilvusConfig) (*etcdBasedBackend, error) { + etcdCli, err := etcd.GetEtcdClient(cfg.EtcdCfg) + if err != nil { + return nil, err + } + txn := etcdkv.NewEtcdKV(etcdCli, cfg.EtcdCfg.MetaRootPath) + b := &etcdBasedBackend{cfg: cfg, etcdCli: etcdCli, txn: txn} + return b, nil +} diff --git a/cmd/tools/migration/backend/etcd210.go b/cmd/tools/migration/backend/etcd210.go new file mode 100644 index 0000000000..648108be8d --- /dev/null +++ b/cmd/tools/migration/backend/etcd210.go @@ -0,0 +1,417 @@ +package backend + +import ( + "fmt" + "io/ioutil" + "path" + "strconv" + "strings" + + "github.com/milvus-io/milvus/cmd/tools/migration/configs" + "github.com/milvus-io/milvus/cmd/tools/migration/legacy" + + "github.com/milvus-io/milvus/cmd/tools/migration/legacy/legacypb" + + "github.com/golang/protobuf/proto" + "github.com/milvus-io/milvus/cmd/tools/migration/console" + "github.com/milvus-io/milvus/cmd/tools/migration/meta" + "github.com/milvus-io/milvus/cmd/tools/migration/utils" + "github.com/milvus-io/milvus/cmd/tools/migration/versions" + "github.com/milvus-io/milvus/internal/metastore/kv/rootcoord" + pb "github.com/milvus-io/milvus/internal/proto/etcdpb" + "github.com/milvus-io/milvus/internal/util/typeutil" +) + +// etcd210 implements Backend. +type etcd210 struct { + Backend + *etcdBasedBackend +} + +func newEtcd210(cfg *configs.MilvusConfig) (*etcd210, error) { + etcdBackend, err := newEtcdBasedBackend(cfg) + if err != nil { + return nil, err + } + return &etcd210{etcdBasedBackend: etcdBackend}, nil +} + +func (b etcd210) loadTtAliases() (meta.TtAliasesMeta210, error) { + ttAliases := make(meta.TtAliasesMeta210) + prefix := path.Join(rootcoord.SnapshotPrefix, rootcoord.CollectionAliasMetaPrefix210) + keys, values, err := b.txn.LoadWithPrefix(prefix) + if err != nil { + return nil, err + } + if len(keys) != len(values) { + return nil, fmt.Errorf("length mismatch") + } + l := len(keys) + for i := 0; i < l; i++ { + tsKey := keys[i] + tsValue := values[i] + valueIsTombstone := rootcoord.IsTombstone(tsValue) + var aliasInfo = &pb.CollectionInfo{} // alias stored in collection info. + if valueIsTombstone { + aliasInfo = nil + } else { + if err := proto.Unmarshal([]byte(tsValue), aliasInfo); err != nil { + return nil, err + } + } + key, ts, err := utils.SplitBySeparator(tsKey) + if err != nil { + return nil, err + } + ttAliases.AddAlias(utils.GetFileName(key), aliasInfo, ts) + } + return ttAliases, nil +} + +func (b etcd210) loadAliases() (meta.AliasesMeta210, error) { + aliases := make(meta.AliasesMeta210) + prefix := rootcoord.CollectionAliasMetaPrefix210 + keys, values, err := b.txn.LoadWithPrefix(prefix) + if err != nil { + return nil, err + } + if len(keys) != len(values) { + return nil, fmt.Errorf("length mismatch") + } + l := len(keys) + for i := 0; i < l; i++ { + key := keys[i] + value := values[i] + valueIsTombstone := rootcoord.IsTombstone(value) + var aliasInfo = &pb.CollectionInfo{} // alias stored in collection info. + if valueIsTombstone { + aliasInfo = nil + } else { + if err := proto.Unmarshal([]byte(value), aliasInfo); err != nil { + return nil, err + } + } + aliases.AddAlias(utils.GetFileName(key), aliasInfo) + } + return aliases, nil +} + +func (b etcd210) loadTtCollections() (meta.TtCollectionsMeta210, error) { + ttCollections := make(meta.TtCollectionsMeta210) + prefix := path.Join(rootcoord.SnapshotPrefix, rootcoord.CollectionMetaPrefix) + keys, values, err := b.txn.LoadWithPrefix(prefix) + if err != nil { + return nil, err + } + if len(keys) != len(values) { + return nil, fmt.Errorf("length mismatch") + } + l := len(keys) + for i := 0; i < l; i++ { + tsKey := keys[i] + tsValue := values[i] + + // ugly here, since alias and collections have same prefix. + if strings.Contains(tsKey, rootcoord.CollectionAliasMetaPrefix210) { + continue + } + + valueIsTombstone := rootcoord.IsTombstone(tsValue) + var coll = &pb.CollectionInfo{} + if valueIsTombstone { + coll = nil + } else { + if err := proto.Unmarshal([]byte(tsValue), coll); err != nil { + return nil, err + } + } + key, ts, err := utils.SplitBySeparator(tsKey) + if err != nil { + return nil, err + } + collectionID, err := strconv.Atoi(utils.GetFileName(key)) + if err != nil { + return nil, err + } + ttCollections.AddCollection(typeutil.UniqueID(collectionID), coll, ts) + } + return ttCollections, nil +} + +func (b etcd210) loadCollections() (meta.CollectionsMeta210, error) { + collections := make(meta.CollectionsMeta210) + prefix := rootcoord.CollectionMetaPrefix + keys, values, err := b.txn.LoadWithPrefix(prefix) + if err != nil { + return nil, err + } + if len(keys) != len(values) { + return nil, fmt.Errorf("length mismatch") + } + l := len(keys) + for i := 0; i < l; i++ { + key := keys[i] + value := values[i] + + // ugly here, since alias and collections have same prefix. + if strings.Contains(key, rootcoord.CollectionAliasMetaPrefix210) { + continue + } + + valueIsTombstone := rootcoord.IsTombstone(value) + var coll = &pb.CollectionInfo{} + if valueIsTombstone { + coll = nil + } else { + if err := proto.Unmarshal([]byte(value), coll); err != nil { + return nil, err + } + } + collectionID, err := strconv.Atoi(utils.GetFileName(key)) + if err != nil { + return nil, err + } + collections.AddCollection(typeutil.UniqueID(collectionID), coll) + } + return collections, nil +} + +func parseCollectionIndexKey(key string) (collectionID, indexID typeutil.UniqueID, err error) { + ss := strings.Split(key, "/") + l := len(ss) + if l < 2 { + return 0, 0, fmt.Errorf("failed to parse collection index key: %s", key) + } + index, err := strconv.Atoi(ss[l-1]) + if err != nil { + return 0, 0, err + } + collection, err := strconv.Atoi(ss[l-2]) + if err != nil { + return 0, 0, err + } + return typeutil.UniqueID(collection), typeutil.UniqueID(index), nil +} + +func (b etcd210) loadCollectionIndexes() (meta.CollectionIndexesMeta210, error) { + collectionIndexes := make(meta.CollectionIndexesMeta210) + prefix := legacy.IndexMetaBefore220Prefix + keys, values, err := b.txn.LoadWithPrefix(prefix) + if err != nil { + return nil, err + } + if len(keys) != len(values) { + return nil, fmt.Errorf("length mismatch") + } + l := len(keys) + for i := 0; i < l; i++ { + key := keys[i] + value := values[i] + + var index = &pb.IndexInfo{} + if err := proto.Unmarshal([]byte(value), index); err != nil { + return nil, err + } + collectionID, indexID, err := parseCollectionIndexKey(key) + if err != nil { + return nil, err + } + collectionIndexes.AddIndex(collectionID, indexID, index) + } + return collectionIndexes, nil +} + +func (b etcd210) loadSegmentIndexes() (meta.SegmentIndexesMeta210, error) { + segmentIndexes := make(meta.SegmentIndexesMeta210) + prefix := legacy.SegmentIndexPrefixBefore220 + keys, values, err := b.txn.LoadWithPrefix(prefix) + if err != nil { + return nil, err + } + if len(keys) != len(values) { + return nil, fmt.Errorf("length mismatch") + } + l := len(keys) + for i := 0; i < l; i++ { + value := values[i] + + var index = &pb.SegmentIndexInfo{} + if err := proto.Unmarshal([]byte(value), index); err != nil { + return nil, err + } + segmentIndexes.AddIndex(index.GetSegmentID(), index.GetIndexID(), index) + } + return segmentIndexes, nil +} + +func (b etcd210) loadIndexBuildMeta() (meta.IndexBuildMeta210, error) { + indexBuildMeta := make(meta.IndexBuildMeta210) + prefix := legacy.IndexBuildPrefixBefore220 + keys, values, err := b.txn.LoadWithPrefix(prefix) + if err != nil { + return nil, err + } + if len(keys) != len(values) { + return nil, fmt.Errorf("length mismatch") + } + l := len(keys) + for i := 0; i < l; i++ { + value := values[i] + + var record = &legacypb.IndexMeta{} + if err := proto.Unmarshal([]byte(value), record); err != nil { + return nil, err + } + indexBuildMeta.AddRecord(record.GetIndexBuildID(), record) + } + return indexBuildMeta, nil +} + +func (b etcd210) loadLastDDLRecords() (meta.LastDDLRecords, error) { + records := make(meta.LastDDLRecords) + prefixes := []string{ + legacy.DDOperationPrefixBefore220, + legacy.DDMsgSendPrefixBefore220, + path.Join(rootcoord.SnapshotPrefix, legacy.DDOperationPrefixBefore220), + path.Join(rootcoord.SnapshotPrefix, legacy.DDMsgSendPrefixBefore220), + } + for _, prefix := range prefixes { + keys, values, err := b.txn.LoadWithPrefix(prefix) + if err != nil { + return nil, err + } + if len(keys) != len(values) { + return nil, fmt.Errorf("length mismatch") + } + for i, k := range keys { + records.AddRecord(k, values[i]) + } + } + return records, nil +} + +func (b etcd210) Load() (*meta.Meta, error) { + ttCollections, err := b.loadTtCollections() + if err != nil { + return nil, err + } + collections, err := b.loadCollections() + if err != nil { + return nil, err + } + ttAliases, err := b.loadTtAliases() + if err != nil { + return nil, err + } + aliases, err := b.loadAliases() + if err != nil { + return nil, err + } + collectionIndexes, err := b.loadCollectionIndexes() + if err != nil { + return nil, err + } + segmentIndexes, err := b.loadSegmentIndexes() + if err != nil { + return nil, err + } + indexBuildMeta, err := b.loadIndexBuildMeta() + if err != nil { + return nil, err + } + lastDdlRecords, err := b.loadLastDDLRecords() + if err != nil { + return nil, err + } + return &meta.Meta{ + Version: versions.Version210, + Meta210: &meta.All210{ + TtCollections: ttCollections, + Collections: collections, + TtAliases: ttAliases, + Aliases: aliases, + CollectionIndexes: collectionIndexes, + SegmentIndexes: segmentIndexes, + IndexBuildMeta: indexBuildMeta, + LastDDLRecords: lastDdlRecords, + }, + }, nil +} + +func lineCleanPrefix(prefix string) { + fmt.Printf("prefix %s will be removed!\n", prefix) +} + +func (b etcd210) Clean() error { + prefixes := []string{ + rootcoord.CollectionMetaPrefix, + path.Join(rootcoord.SnapshotPrefix, rootcoord.CollectionMetaPrefix), + + rootcoord.CollectionAliasMetaPrefix210, + path.Join(rootcoord.SnapshotPrefix, rootcoord.CollectionAliasMetaPrefix210), + + legacy.SegmentIndexPrefixBefore220, + + legacy.IndexMetaBefore220Prefix, + + legacy.IndexBuildPrefixBefore220, + + legacy.DDMsgSendPrefixBefore220, + path.Join(rootcoord.SnapshotPrefix, legacy.DDMsgSendPrefixBefore220), + legacy.DDOperationPrefixBefore220, + path.Join(rootcoord.SnapshotPrefix, legacy.DDOperationPrefixBefore220), + } + for _, prefix := range prefixes { + if err := b.CleanWithPrefix(prefix); err != nil { + return err + } + lineCleanPrefix(prefix) + } + return nil +} + +func (b etcd210) Backup(meta *meta.Meta, backupFile string) error { + saves := meta.Meta210.GenerateSaves() + codec := NewBackupCodec() + var instance, metaPath string + metaRootPath := b.cfg.EtcdCfg.MetaRootPath + parts := strings.Split(metaRootPath, "/") + if len(parts) > 1 { + metaPath = parts[len(parts)-1] + instance = path.Join(parts[:len(parts)-1]...) + } else { + instance = metaRootPath + } + header := &BackupHeader{ + Version: BackupHeaderVersionV1, + Instance: instance, + MetaPath: metaPath, + Entries: int64(len(saves)), + Component: "", + Extra: nil, + } + backup, err := codec.Serialize(header, saves) + if err != nil { + return err + } + console.Warning(fmt.Sprintf("backup to: %s", backupFile)) + return ioutil.WriteFile(backupFile, backup, 0600) +} + +func (b etcd210) Restore(backupFile string) error { + backup, err := ioutil.ReadFile(backupFile) + if err != nil { + return err + } + codec := NewBackupCodec() + _, saves, err := codec.DeSerialize(backup) + if err != nil { + return err + } + for k, v := range saves { + if err := b.txn.Save(k, v); err != nil { + return err + } + } + return nil +} diff --git a/cmd/tools/migration/backend/etcd220.go b/cmd/tools/migration/backend/etcd220.go new file mode 100644 index 0000000000..af8ba7d434 --- /dev/null +++ b/cmd/tools/migration/backend/etcd220.go @@ -0,0 +1,123 @@ +package backend + +import ( + "fmt" + + "github.com/milvus-io/milvus/cmd/tools/migration/configs" + + "github.com/milvus-io/milvus/internal/util" + + "github.com/milvus-io/milvus/cmd/tools/migration/meta" + "github.com/milvus-io/milvus/internal/metastore/kv/rootcoord" +) + +// etcd220 implements Backend. +type etcd220 struct { + Backend + *etcdBasedBackend +} + +func newEtcd220(cfg *configs.MilvusConfig) (*etcd220, error) { + etcdBackend, err := newEtcdBasedBackend(cfg) + if err != nil { + return nil, err + } + return &etcd220{etcdBasedBackend: etcdBackend}, nil +} + +func lineSaveTo(key string) { + fmt.Printf("save to %s\n", key) +} + +func printSaves(saves map[string]string) { + for k := range saves { + lineSaveTo(k) + } +} + +func (b etcd220) save(saves map[string]string) error { + for k, v := range saves { + if err := b.txn.Save(k, v); err != nil { + return err + } + } + return nil +} + +func (b etcd220) Save(metas *meta.Meta) error { + { + saves, err := metas.Meta220.TtCollections.GenerateSaves(metas.SourceVersion) + if err != nil { + return err + } + if err := b.save(saves); err != nil { + return err + } + } + { + saves, err := metas.Meta220.Collections.GenerateSaves(metas.SourceVersion) + if err != nil { + return err + } + if err := b.save(saves); err != nil { + return err + } + } + { + saves, err := metas.Meta220.TtAliases.GenerateSaves() + if err != nil { + return err + } + if err := b.save(saves); err != nil { + return err + } + } + { + saves, err := metas.Meta220.Aliases.GenerateSaves() + if err != nil { + return err + } + if err := b.save(saves); err != nil { + return err + } + } + { + saves, err := metas.Meta220.CollectionIndexes.GenerateSaves() + if err != nil { + return err + } + if err := b.save(saves); err != nil { + return err + } + } + { + saves, err := metas.Meta220.SegmentIndexes.GenerateSaves() + if err != nil { + return err + } + if err := b.save(saves); err != nil { + return err + } + } + + return nil +} + +func (b etcd220) Clean() error { + prefixes := []string{ + rootcoord.CollectionMetaPrefix, + rootcoord.PartitionMetaPrefix, + rootcoord.FieldMetaPrefix, + rootcoord.AliasMetaPrefix, + + util.FieldIndexPrefix, + util.SegmentIndexPrefix, + } + for _, prefix := range prefixes { + if err := b.CleanWithPrefix(prefix); err != nil { + return err + } + lineCleanPrefix(prefix) + } + return nil +} diff --git a/cmd/tools/migration/command/backup.go b/cmd/tools/migration/command/backup.go new file mode 100644 index 0000000000..767278ee3d --- /dev/null +++ b/cmd/tools/migration/command/backup.go @@ -0,0 +1,20 @@ +package command + +import ( + "context" + + "github.com/milvus-io/milvus/cmd/tools/migration/configs" + "github.com/milvus-io/milvus/cmd/tools/migration/console" + "github.com/milvus-io/milvus/cmd/tools/migration/migration" +) + +func Backup(c *configs.Config) { + ctx := context.Background() + runner := migration.NewRunner(ctx, c) + console.ExitIf(runner.CheckSessions()) + console.ExitIf(runner.RegisterSession()) + defer runner.Stop() + // double check. + console.ExitIf(runner.CheckSessions()) + console.ExitIf(runner.Backup()) +} diff --git a/cmd/tools/migration/command/config.go b/cmd/tools/migration/command/config.go new file mode 100644 index 0000000000..db36b30895 --- /dev/null +++ b/cmd/tools/migration/command/config.go @@ -0,0 +1,24 @@ +package command + +import ( + "flag" + + "github.com/milvus-io/milvus/cmd/tools/migration/console" +) + +type commandParser struct { + configYaml string +} + +func (c *commandParser) formatYaml(args []string, flags *flag.FlagSet) { + flags.StringVar(&c.configYaml, "config", "", "set config yaml") +} + +func (c *commandParser) parse(args []string, flags *flag.FlagSet) { + console.ExitIf(flags.Parse(args[1:])) +} + +func (c *commandParser) format(args []string, flags *flag.FlagSet) { + c.formatYaml(args, flags) + c.parse(args, flags) +} diff --git a/cmd/tools/migration/command/help.go b/cmd/tools/migration/command/help.go new file mode 100644 index 0000000000..6d1f8d0c31 --- /dev/null +++ b/cmd/tools/migration/command/help.go @@ -0,0 +1,12 @@ +package command + +import "fmt" + +var ( + usageLineV2 = fmt.Sprintf("Usage:\n"+ + "%s\n", runLineV2) + + runLineV2 = ` +migration -config=config.yaml +` +) diff --git a/cmd/tools/migration/command/main.go b/cmd/tools/migration/command/main.go new file mode 100644 index 0000000000..128455f9ca --- /dev/null +++ b/cmd/tools/migration/command/main.go @@ -0,0 +1,35 @@ +package command + +import ( + "flag" + "fmt" + "os" + + "github.com/milvus-io/milvus/cmd/tools/migration/console" + + "github.com/milvus-io/milvus/cmd/tools/migration/configs" +) + +func Execute(args []string) { + flags := flag.NewFlagSet(args[0], flag.ExitOnError) + flags.Usage = func() { + fmt.Fprintln(os.Stderr, usageLineV2) + } + + c := &commandParser{} + c.format(args, flags) + + console.ErrorExitIf(c.configYaml == "", "config not set") + + cfg := configs.NewConfig(c.configYaml) + switch cfg.Cmd { + case configs.RunCmd: + Run(cfg) + case configs.BackupCmd: + Backup(cfg) + case configs.RollbackCmd: + Rollback(cfg) + default: + console.Exit(fmt.Sprintf("cmd not set or not supported: %s", cfg.Cmd)) + } +} diff --git a/cmd/tools/migration/command/rollback.go b/cmd/tools/migration/command/rollback.go new file mode 100644 index 0000000000..ece126a489 --- /dev/null +++ b/cmd/tools/migration/command/rollback.go @@ -0,0 +1,20 @@ +package command + +import ( + "context" + + "github.com/milvus-io/milvus/cmd/tools/migration/configs" + "github.com/milvus-io/milvus/cmd/tools/migration/console" + "github.com/milvus-io/milvus/cmd/tools/migration/migration" +) + +func Rollback(c *configs.Config) { + ctx := context.Background() + runner := migration.NewRunner(ctx, c) + console.ExitIf(runner.CheckSessions()) + console.ExitIf(runner.RegisterSession()) + defer runner.Stop() + // double check. + console.ExitIf(runner.CheckSessions()) + console.ExitIf(runner.Rollback()) +} diff --git a/cmd/tools/migration/command/run.go b/cmd/tools/migration/command/run.go new file mode 100644 index 0000000000..d1fd8713c7 --- /dev/null +++ b/cmd/tools/migration/command/run.go @@ -0,0 +1,24 @@ +package command + +import ( + "context" + + "github.com/milvus-io/milvus/cmd/tools/migration/configs" + + "github.com/milvus-io/milvus/cmd/tools/migration/console" + + "github.com/milvus-io/milvus/cmd/tools/migration/migration" +) + +func Run(c *configs.Config) { + ctx := context.Background() + runner := migration.NewRunner(ctx, c) + console.ExitIf(runner.CheckSessions()) + console.ExitIf(runner.RegisterSession()) + defer runner.Stop() + // double check. + console.ExitIf(runner.CheckSessions()) + console.ExitIf(runner.Validate()) + console.NormalExitIf(runner.CheckCompatible(), "version compatible, no need to migrate") + console.ExitIf(runner.Migrate()) +} diff --git a/cmd/tools/migration/configs/config.go b/cmd/tools/migration/configs/config.go new file mode 100644 index 0000000000..0eb511f7a1 --- /dev/null +++ b/cmd/tools/migration/configs/config.go @@ -0,0 +1,98 @@ +package configs + +import ( + "fmt" + + "github.com/milvus-io/milvus/cmd/tools/migration/console" + "github.com/milvus-io/milvus/internal/util" + "github.com/milvus-io/milvus/internal/util/paramtable" +) + +const ( + RunCmd = "run" + BackupCmd = "backup" + RollbackCmd = "rollback" +) + +type RunConfig struct { + base *paramtable.BaseTable + Cmd string + SourceVersion string + TargetVersion string + BackupFilePath string +} + +func newRunConfig(base *paramtable.BaseTable) *RunConfig { + c := &RunConfig{} + c.init(base) + return c +} + +func (c *RunConfig) String() string { + return fmt.Sprintf("Cmd: %s, SourceVersion: %s, TargetVersion: %s, BackupFilePath: %s", + c.Cmd, c.SourceVersion, c.TargetVersion, c.BackupFilePath) +} + +func (c *RunConfig) show() { + console.Warning(c.String()) +} + +func (c *RunConfig) init(base *paramtable.BaseTable) { + c.base = base + + c.Cmd = c.base.LoadWithDefault("cmd.type", "") + c.SourceVersion = c.base.LoadWithDefault("config.sourceVersion", "") + c.TargetVersion = c.base.LoadWithDefault("config.targetVersion", "") + c.BackupFilePath = c.base.LoadWithDefault("config.backupFilePath", "") + + c.show() +} + +type MilvusConfig struct { + MetaStoreCfg *paramtable.MetaStoreConfig + EtcdCfg *paramtable.EtcdConfig + MysqlCfg *paramtable.MetaDBConfig +} + +func newMilvusConfig(base *paramtable.BaseTable) *MilvusConfig { + c := &MilvusConfig{} + c.init(base) + return c +} + +func (c *MilvusConfig) init(base *paramtable.BaseTable) { + c.MetaStoreCfg = ¶mtable.MetaStoreConfig{} + c.EtcdCfg = ¶mtable.EtcdConfig{} + c.MysqlCfg = ¶mtable.MetaDBConfig{} + + c.MetaStoreCfg.Base = base + c.MetaStoreCfg.LoadCfgToMemory() + + switch c.MetaStoreCfg.MetaStoreType { + case util.MetaStoreTypeMysql: + c.MysqlCfg.Base = base + c.MysqlCfg.LoadCfgToMemory() + default: + } + + c.EtcdCfg.Base = base + c.EtcdCfg.LoadCfgToMemory() +} + +type Config struct { + base *paramtable.BaseTable + *RunConfig + *MilvusConfig +} + +func (c *Config) init(yamlFile string) { + c.base = paramtable.NewBaseTableFromYamlOnly(yamlFile) + c.RunConfig = newRunConfig(c.base) + c.MilvusConfig = newMilvusConfig(c.base) +} + +func NewConfig(yamlFile string) *Config { + c := &Config{} + c.init(yamlFile) + return c +} diff --git a/cmd/tools/migration/console/console.go b/cmd/tools/migration/console/console.go new file mode 100644 index 0000000000..801bdb3aae --- /dev/null +++ b/cmd/tools/migration/console/console.go @@ -0,0 +1,57 @@ +package console + +import ( + "fmt" + "os" + + "github.com/mgutz/ansi" +) + +func Success(msg string) { + colorOut(msg, "green") +} + +func Error(msg string) { + colorOut(msg, "red") +} + +func Warning(msg string) { + colorOut(msg, "yellow") +} + +func Exit(msg string) { + Error(msg) + os.Exit(1) +} + +func ExitIf(err error) { + if err != nil { + Exit(err.Error()) + } +} + +func NormalExit(msg string) { + Success(msg) + os.Exit(0) +} + +func NormalExitIf(success bool, msg string) { + if success { + NormalExit(msg) + } +} + +func ErrorExit(msg string) { + Warning(msg) + os.Exit(1) +} + +func ErrorExitIf(fail bool, msg string) { + if fail { + ErrorExit(msg) + } +} + +func colorOut(message, color string) { + fmt.Fprintln(os.Stdout, ansi.Color(message, color)) +} diff --git a/cmd/tools/migration/console/console_test.go b/cmd/tools/migration/console/console_test.go new file mode 100644 index 0000000000..ac0c4bbaf4 --- /dev/null +++ b/cmd/tools/migration/console/console_test.go @@ -0,0 +1,21 @@ +package console + +import ( + "testing" +) + +func TestSuccess(t *testing.T) { + Success("success") +} + +func TestError(t *testing.T) { + Error("error") +} + +func TestWarning(t *testing.T) { + Warning("warning") +} + +func TestExitIf(t *testing.T) { + ExitIf(nil) +} diff --git a/cmd/tools/migration/example.yaml b/cmd/tools/migration/example.yaml new file mode 100644 index 0000000000..ca9c7b4f0f --- /dev/null +++ b/cmd/tools/migration/example.yaml @@ -0,0 +1,44 @@ +cmd: + # Option: run/backup/rollback + type: run + +config: + sourceVersion: 2.1.0 + targetVersion: 2.2.0 + backupFilePath: /tmp/migration.bak + +metastore: + type: etcd + +etcd: + endpoints: + - localhost:2379 + rootPath: by-dev # The root path where data is stored in etcd + metaSubPath: meta # metaRootPath = rootPath + '/' + metaSubPath + kvSubPath: kv # kvRootPath = rootPath + '/' + kvSubPath + log: + # path is one of: + # - "default" as os.Stderr, + # - "stderr" as os.Stderr, + # - "stdout" as os.Stdout, + # - file path to append server logs to. + # please adjust in embedded Milvus: /tmp/milvus/logs/etcd.log + path: stdout + level: info # Only supports debug, info, warn, error, panic, or fatal. Default 'info'. + use: + # please adjust in embedded Milvus: true + embed: false # Whether to enable embedded Etcd (an in-process EtcdServer). + data: + # Embedded Etcd only. + # please adjust in embedded Milvus: /tmp/milvus/etcdData/ + dir: default.etcd + ssl: + enabled: false # Whether to support ETCD secure connection mode + tlsCert: /path/to/etcd-client.pem # path to your cert file + tlsKey: /path/to/etcd-client-key.pem # path to your key file + tlsCACert: /path/to/ca.pem # path to your CACert file + # TLS min version + # Optional values: 1.0, 1.1, 1.2, 1.3。 + # We recommend using version 1.2 and above + tlsMinVersion: 1.3 + diff --git a/cmd/tools/migration/legacy/constant.go b/cmd/tools/migration/legacy/constant.go new file mode 100644 index 0000000000..b84595eee7 --- /dev/null +++ b/cmd/tools/migration/legacy/constant.go @@ -0,0 +1,11 @@ +package legacy + +import "github.com/milvus-io/milvus/internal/metastore/kv/rootcoord" + +const ( + DDOperationPrefixBefore220 = rootcoord.ComponentPrefix + "/dd-operation" + DDMsgSendPrefixBefore220 = rootcoord.ComponentPrefix + "/dd-msg-send" + IndexMetaBefore220Prefix = rootcoord.ComponentPrefix + "/index" + SegmentIndexPrefixBefore220 = rootcoord.ComponentPrefix + "/segment-index" + IndexBuildPrefixBefore220 = "indexes" +) diff --git a/cmd/tools/migration/legacy/legacy.proto b/cmd/tools/migration/legacy/legacy.proto new file mode 100644 index 0000000000..fe44f3f79e --- /dev/null +++ b/cmd/tools/migration/legacy/legacy.proto @@ -0,0 +1,35 @@ +syntax = "proto3"; + +package milvus.proto.legacy; + +option go_package = "github.com/milvus-io/milvus/internal/proto/legacypb"; + +import "common.proto"; +import "schema.proto"; + +/********************************** Index Build Meta Before Version 220 ***************************************/ + +message BuildIndexRequest { + int64 indexBuildID = 1; + string index_name = 2; + int64 indexID = 3; + repeated string data_paths = 5; + repeated common.KeyValuePair type_params = 6; + repeated common.KeyValuePair index_params = 7; + int64 num_rows = 8; + schema.FieldSchema field_schema = 9; + int64 segmentID = 10; +} + +message IndexMeta { + int64 indexBuildID = 1; + common.IndexState state = 2; + string fail_reason = 3; + BuildIndexRequest req = 4; + repeated string index_file_paths = 5; + bool mark_deleted = 6; + int64 nodeID = 7; + int64 index_version = 8; + bool recycled = 9; + uint64 serialize_size = 10; +} \ No newline at end of file diff --git a/cmd/tools/migration/legacy/legacypb/legacy.pb.go b/cmd/tools/migration/legacy/legacypb/legacy.pb.go new file mode 100644 index 0000000000..6f910679eb --- /dev/null +++ b/cmd/tools/migration/legacy/legacypb/legacy.pb.go @@ -0,0 +1,280 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: legacy.proto + +package legacypb + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + commonpb "github.com/milvus-io/milvus/api/commonpb" + schemapb "github.com/milvus-io/milvus/api/schemapb" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type BuildIndexRequest struct { + IndexBuildID int64 `protobuf:"varint,1,opt,name=indexBuildID,proto3" json:"indexBuildID,omitempty"` + IndexName string `protobuf:"bytes,2,opt,name=index_name,json=indexName,proto3" json:"index_name,omitempty"` + IndexID int64 `protobuf:"varint,3,opt,name=indexID,proto3" json:"indexID,omitempty"` + DataPaths []string `protobuf:"bytes,5,rep,name=data_paths,json=dataPaths,proto3" json:"data_paths,omitempty"` + TypeParams []*commonpb.KeyValuePair `protobuf:"bytes,6,rep,name=type_params,json=typeParams,proto3" json:"type_params,omitempty"` + IndexParams []*commonpb.KeyValuePair `protobuf:"bytes,7,rep,name=index_params,json=indexParams,proto3" json:"index_params,omitempty"` + NumRows int64 `protobuf:"varint,8,opt,name=num_rows,json=numRows,proto3" json:"num_rows,omitempty"` + FieldSchema *schemapb.FieldSchema `protobuf:"bytes,9,opt,name=field_schema,json=fieldSchema,proto3" json:"field_schema,omitempty"` + SegmentID int64 `protobuf:"varint,10,opt,name=segmentID,proto3" json:"segmentID,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *BuildIndexRequest) Reset() { *m = BuildIndexRequest{} } +func (m *BuildIndexRequest) String() string { return proto.CompactTextString(m) } +func (*BuildIndexRequest) ProtoMessage() {} +func (*BuildIndexRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_4b5c555c498591f0, []int{0} +} + +func (m *BuildIndexRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_BuildIndexRequest.Unmarshal(m, b) +} +func (m *BuildIndexRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_BuildIndexRequest.Marshal(b, m, deterministic) +} +func (m *BuildIndexRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_BuildIndexRequest.Merge(m, src) +} +func (m *BuildIndexRequest) XXX_Size() int { + return xxx_messageInfo_BuildIndexRequest.Size(m) +} +func (m *BuildIndexRequest) XXX_DiscardUnknown() { + xxx_messageInfo_BuildIndexRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_BuildIndexRequest proto.InternalMessageInfo + +func (m *BuildIndexRequest) GetIndexBuildID() int64 { + if m != nil { + return m.IndexBuildID + } + return 0 +} + +func (m *BuildIndexRequest) GetIndexName() string { + if m != nil { + return m.IndexName + } + return "" +} + +func (m *BuildIndexRequest) GetIndexID() int64 { + if m != nil { + return m.IndexID + } + return 0 +} + +func (m *BuildIndexRequest) GetDataPaths() []string { + if m != nil { + return m.DataPaths + } + return nil +} + +func (m *BuildIndexRequest) GetTypeParams() []*commonpb.KeyValuePair { + if m != nil { + return m.TypeParams + } + return nil +} + +func (m *BuildIndexRequest) GetIndexParams() []*commonpb.KeyValuePair { + if m != nil { + return m.IndexParams + } + return nil +} + +func (m *BuildIndexRequest) GetNumRows() int64 { + if m != nil { + return m.NumRows + } + return 0 +} + +func (m *BuildIndexRequest) GetFieldSchema() *schemapb.FieldSchema { + if m != nil { + return m.FieldSchema + } + return nil +} + +func (m *BuildIndexRequest) GetSegmentID() int64 { + if m != nil { + return m.SegmentID + } + return 0 +} + +type IndexMeta struct { + IndexBuildID int64 `protobuf:"varint,1,opt,name=indexBuildID,proto3" json:"indexBuildID,omitempty"` + State commonpb.IndexState `protobuf:"varint,2,opt,name=state,proto3,enum=milvus.proto.common.IndexState" json:"state,omitempty"` + FailReason string `protobuf:"bytes,3,opt,name=fail_reason,json=failReason,proto3" json:"fail_reason,omitempty"` + Req *BuildIndexRequest `protobuf:"bytes,4,opt,name=req,proto3" json:"req,omitempty"` + IndexFilePaths []string `protobuf:"bytes,5,rep,name=index_file_paths,json=indexFilePaths,proto3" json:"index_file_paths,omitempty"` + MarkDeleted bool `protobuf:"varint,6,opt,name=mark_deleted,json=markDeleted,proto3" json:"mark_deleted,omitempty"` + NodeID int64 `protobuf:"varint,7,opt,name=nodeID,proto3" json:"nodeID,omitempty"` + IndexVersion int64 `protobuf:"varint,8,opt,name=index_version,json=indexVersion,proto3" json:"index_version,omitempty"` + Recycled bool `protobuf:"varint,9,opt,name=recycled,proto3" json:"recycled,omitempty"` + SerializeSize uint64 `protobuf:"varint,10,opt,name=serialize_size,json=serializeSize,proto3" json:"serialize_size,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *IndexMeta) Reset() { *m = IndexMeta{} } +func (m *IndexMeta) String() string { return proto.CompactTextString(m) } +func (*IndexMeta) ProtoMessage() {} +func (*IndexMeta) Descriptor() ([]byte, []int) { + return fileDescriptor_4b5c555c498591f0, []int{1} +} + +func (m *IndexMeta) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_IndexMeta.Unmarshal(m, b) +} +func (m *IndexMeta) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_IndexMeta.Marshal(b, m, deterministic) +} +func (m *IndexMeta) XXX_Merge(src proto.Message) { + xxx_messageInfo_IndexMeta.Merge(m, src) +} +func (m *IndexMeta) XXX_Size() int { + return xxx_messageInfo_IndexMeta.Size(m) +} +func (m *IndexMeta) XXX_DiscardUnknown() { + xxx_messageInfo_IndexMeta.DiscardUnknown(m) +} + +var xxx_messageInfo_IndexMeta proto.InternalMessageInfo + +func (m *IndexMeta) GetIndexBuildID() int64 { + if m != nil { + return m.IndexBuildID + } + return 0 +} + +func (m *IndexMeta) GetState() commonpb.IndexState { + if m != nil { + return m.State + } + return commonpb.IndexState_IndexStateNone +} + +func (m *IndexMeta) GetFailReason() string { + if m != nil { + return m.FailReason + } + return "" +} + +func (m *IndexMeta) GetReq() *BuildIndexRequest { + if m != nil { + return m.Req + } + return nil +} + +func (m *IndexMeta) GetIndexFilePaths() []string { + if m != nil { + return m.IndexFilePaths + } + return nil +} + +func (m *IndexMeta) GetMarkDeleted() bool { + if m != nil { + return m.MarkDeleted + } + return false +} + +func (m *IndexMeta) GetNodeID() int64 { + if m != nil { + return m.NodeID + } + return 0 +} + +func (m *IndexMeta) GetIndexVersion() int64 { + if m != nil { + return m.IndexVersion + } + return 0 +} + +func (m *IndexMeta) GetRecycled() bool { + if m != nil { + return m.Recycled + } + return false +} + +func (m *IndexMeta) GetSerializeSize() uint64 { + if m != nil { + return m.SerializeSize + } + return 0 +} + +func init() { + proto.RegisterType((*BuildIndexRequest)(nil), "milvus.proto.legacy.BuildIndexRequest") + proto.RegisterType((*IndexMeta)(nil), "milvus.proto.legacy.IndexMeta") +} + +func init() { proto.RegisterFile("legacy.proto", fileDescriptor_4b5c555c498591f0) } + +var fileDescriptor_4b5c555c498591f0 = []byte{ + // 511 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x92, 0xc1, 0x6b, 0xdb, 0x30, + 0x14, 0xc6, 0xc9, 0xdc, 0xa6, 0xf1, 0xb3, 0x5b, 0x36, 0x0d, 0x86, 0x56, 0x36, 0xea, 0x66, 0x6c, + 0xf8, 0xb2, 0x04, 0x52, 0x0a, 0x3b, 0x67, 0xa6, 0x10, 0xc6, 0x46, 0x50, 0xa0, 0x87, 0x5d, 0x8c, + 0x12, 0xbf, 0x24, 0x62, 0xb2, 0x9d, 0x4a, 0x72, 0xbb, 0xe4, 0x8f, 0xd8, 0x75, 0xff, 0xee, 0x90, + 0xe4, 0xb6, 0x84, 0xf5, 0xd0, 0x9b, 0xbf, 0x9f, 0xf5, 0x3d, 0xf9, 0x7d, 0x9f, 0x21, 0x96, 0xb8, + 0xe2, 0x8b, 0xed, 0x60, 0xa3, 0x6a, 0x53, 0x93, 0xd7, 0xa5, 0x90, 0xb7, 0x8d, 0xf6, 0x6a, 0xe0, + 0x5f, 0x9d, 0xc6, 0x8b, 0xba, 0x2c, 0xeb, 0xca, 0xc3, 0xd3, 0x58, 0x2f, 0xd6, 0x58, 0x72, 0xaf, + 0xfa, 0x7f, 0x03, 0x78, 0x35, 0x6e, 0x84, 0x2c, 0x26, 0x55, 0x81, 0xbf, 0x19, 0xde, 0x34, 0xa8, + 0x0d, 0xe9, 0x43, 0x2c, 0xac, 0xf6, 0x6f, 0x32, 0xda, 0x49, 0x3a, 0x69, 0xc0, 0xf6, 0x18, 0x79, + 0x0f, 0xe0, 0x74, 0x5e, 0xf1, 0x12, 0xe9, 0x8b, 0xa4, 0x93, 0x86, 0x2c, 0x74, 0xe4, 0x07, 0x2f, + 0x91, 0x50, 0x38, 0x72, 0x62, 0x92, 0xd1, 0xc0, 0xb9, 0xef, 0xa5, 0x35, 0x16, 0xdc, 0xf0, 0x7c, + 0xc3, 0xcd, 0x5a, 0xd3, 0xc3, 0x24, 0xb0, 0x46, 0x4b, 0xa6, 0x16, 0x90, 0x31, 0x44, 0x66, 0xbb, + 0xc1, 0x7c, 0xc3, 0x15, 0x2f, 0x35, 0xed, 0x26, 0x41, 0x1a, 0x8d, 0xce, 0x07, 0x7b, 0x8b, 0xb5, + 0x0b, 0x7d, 0xc3, 0xed, 0x35, 0x97, 0x0d, 0x4e, 0xb9, 0x50, 0x0c, 0xac, 0x6b, 0xea, 0x4c, 0x24, + 0x6b, 0xbf, 0xff, 0x7e, 0xc8, 0xd1, 0x73, 0x87, 0x44, 0xce, 0xd6, 0x4e, 0x79, 0x0b, 0xbd, 0xaa, + 0x29, 0x73, 0x55, 0xdf, 0x69, 0xda, 0xf3, 0x3b, 0x54, 0x4d, 0xc9, 0xea, 0x3b, 0x4d, 0xbe, 0x42, + 0xbc, 0x14, 0x28, 0x8b, 0xdc, 0x87, 0x49, 0xc3, 0xa4, 0x93, 0x46, 0xa3, 0x64, 0xff, 0x82, 0x36, + 0xe8, 0x2b, 0x7b, 0x70, 0xe6, 0x9e, 0x59, 0xb4, 0x7c, 0x14, 0xe4, 0x1d, 0x84, 0x1a, 0x57, 0x25, + 0x56, 0x66, 0x92, 0x51, 0x70, 0x17, 0x3c, 0x82, 0xfe, 0x9f, 0x00, 0x42, 0x57, 0xca, 0x77, 0x34, + 0xfc, 0x59, 0x8d, 0x5c, 0xc2, 0xa1, 0x36, 0xdc, 0xf8, 0x32, 0x4e, 0x46, 0x67, 0x4f, 0xae, 0xeb, + 0x46, 0xce, 0xec, 0x31, 0xe6, 0x4f, 0x93, 0x33, 0x88, 0x96, 0x5c, 0xc8, 0x5c, 0x21, 0xd7, 0x75, + 0xe5, 0xda, 0x0a, 0x19, 0x58, 0xc4, 0x1c, 0x21, 0x5f, 0x20, 0x50, 0x78, 0x43, 0x0f, 0xdc, 0x8e, + 0x9f, 0x06, 0x4f, 0xfc, 0x62, 0x83, 0xff, 0x7e, 0x21, 0x66, 0x2d, 0x24, 0x85, 0x97, 0xbe, 0x87, + 0xa5, 0x90, 0xb8, 0x57, 0xf8, 0x89, 0xe3, 0x57, 0x42, 0xa2, 0x6f, 0xfd, 0x1c, 0xe2, 0x92, 0xab, + 0x5f, 0x79, 0x81, 0x12, 0x0d, 0x16, 0xb4, 0x9b, 0x74, 0xd2, 0x1e, 0x8b, 0x2c, 0xcb, 0x3c, 0x22, + 0x6f, 0xa0, 0x5b, 0xd5, 0x05, 0x4e, 0x32, 0x7a, 0xe4, 0x96, 0x6f, 0x15, 0xf9, 0x00, 0xc7, 0xfe, + 0x92, 0x5b, 0x54, 0x5a, 0xd4, 0x55, 0xdb, 0x95, 0xcf, 0xe6, 0xda, 0x33, 0x72, 0x0a, 0x3d, 0x85, + 0x8b, 0xed, 0x42, 0x62, 0xe1, 0xca, 0xea, 0xb1, 0x07, 0x4d, 0x3e, 0xc2, 0x89, 0x46, 0x25, 0xb8, + 0x14, 0x3b, 0xcc, 0xb5, 0xd8, 0xa1, 0x2b, 0xe3, 0x80, 0x1d, 0x3f, 0xd0, 0x99, 0xd8, 0xe1, 0xf8, + 0xf2, 0xe7, 0xc5, 0x4a, 0x98, 0x75, 0x33, 0xb7, 0x51, 0x0e, 0x7d, 0x0a, 0x9f, 0x45, 0xdd, 0x3e, + 0x0d, 0x45, 0x65, 0x50, 0x55, 0x5c, 0x0e, 0x5d, 0x30, 0x43, 0x1f, 0xcc, 0x66, 0x3e, 0xef, 0x3a, + 0x7d, 0xf1, 0x2f, 0x00, 0x00, 0xff, 0xff, 0x46, 0x37, 0x11, 0xcb, 0xa9, 0x03, 0x00, 0x00, +} diff --git a/cmd/tools/migration/legacy/util.go b/cmd/tools/migration/legacy/util.go new file mode 100644 index 0000000000..36a4d6a13c --- /dev/null +++ b/cmd/tools/migration/legacy/util.go @@ -0,0 +1,19 @@ +package legacy + +import ( + "fmt" + + "github.com/milvus-io/milvus/cmd/tools/migration/utils" +) + +func BuildCollectionIndexKey210(collectionID, indexID utils.UniqueID) string { + return fmt.Sprintf("%s/%d/%d", IndexMetaBefore220Prefix, collectionID, indexID) +} + +func BuildSegmentIndexKey210(segmentID, indexID utils.UniqueID) string { + return fmt.Sprintf("%s/%d/%d", SegmentIndexPrefixBefore220, segmentID, indexID) +} + +func BuildIndexBuildKey210(buildID utils.UniqueID) string { + return fmt.Sprintf("%s/%d", IndexBuildPrefixBefore220, buildID) +} diff --git a/cmd/tools/migration/main.go b/cmd/tools/migration/main.go new file mode 100644 index 0000000000..9eea1fdba3 --- /dev/null +++ b/cmd/tools/migration/main.go @@ -0,0 +1,11 @@ +package main + +import ( + "os" + + "github.com/milvus-io/milvus/cmd/tools/migration/command" +) + +func main() { + command.Execute(os.Args) +} diff --git a/cmd/tools/migration/meta/210_to_220.go b/cmd/tools/migration/meta/210_to_220.go new file mode 100644 index 0000000000..ec575c67f5 --- /dev/null +++ b/cmd/tools/migration/meta/210_to_220.go @@ -0,0 +1,213 @@ +package meta + +import ( + "fmt" + "sort" + + "github.com/milvus-io/milvus/internal/util/typeutil" + + "github.com/milvus-io/milvus/internal/metastore/model" + pb "github.com/milvus-io/milvus/internal/proto/etcdpb" + + "github.com/milvus-io/milvus/cmd/tools/migration/versions" +) + +func alias210ToAlias220(record *pb.CollectionInfo, ts Timestamp) *model.Alias { + if record == nil { + return nil + } + return &model.Alias{ + Name: record.GetSchema().GetName(), + CollectionID: record.GetID(), + CreatedTime: ts, + State: pb.AliasState_AliasCreated, + } +} + +func (meta *TtAliasesMeta210) to220() (TtAliasesMeta220, error) { + ttAliases := make(TtAliasesMeta220) + for alias := range *meta { + for ts := range (*meta)[alias] { + aliasModel := alias210ToAlias220((*meta)[alias][ts], ts) + ttAliases.AddAlias(alias, aliasModel, ts) + } + } + return ttAliases, nil +} + +func (meta *AliasesMeta210) to220() (AliasesMeta220, error) { + aliases := make(AliasesMeta220) + for alias := range *meta { + aliasModel := alias210ToAlias220((*meta)[alias], 0) + aliases.AddAlias(alias, aliasModel) + } + return aliases, nil +} + +func getLatestFieldIndexes(colls map[Timestamp]*pb.CollectionInfo) *FieldIndexesWithSchema { + type pair struct { + ts Timestamp + coll *pb.CollectionInfo + } + l := len(colls) + pairs := make([]pair, l) + for ts, coll := range colls { + pairs = append(pairs, pair{ts: ts, coll: coll}) + } + sort.Slice(pairs, func(i, j int) bool { + return pairs[i].ts < pairs[j].ts + }) + if l > 0 && pairs[l-1].coll != nil { + return &FieldIndexesWithSchema{indexes: pairs[l-1].coll.GetFieldIndexes(), schema: pairs[l-1].coll.GetSchema()} + } + return nil +} + +func collection210ToCollection220(coll *pb.CollectionInfo) *model.Collection { + return model.UnmarshalCollectionModel(coll) +} + +func (meta *TtCollectionsMeta210) to220() (TtCollectionsMeta220, FieldIndexes210, error) { + ttCollections := make(TtCollectionsMeta220) + fieldIndexes := make(FieldIndexes210) + for collectionID := range *meta { + colls := (*meta)[collectionID] + indexes := getLatestFieldIndexes(colls) + if indexes != nil { + fieldIndexes.AddRecord(collectionID, indexes.indexes, indexes.schema) + } + for ts := range colls { + coll := colls[ts] + ttCollections.AddCollection(collectionID, collection210ToCollection220(coll), ts) + } + } + return ttCollections, fieldIndexes, nil +} + +func (meta *CollectionsMeta210) to220() (CollectionsMeta220, FieldIndexes210, error) { + collections := make(CollectionsMeta220) + fieldIndexes := make(FieldIndexes210) + for collectionID := range *meta { + coll := (*meta)[collectionID] + fieldIndexes.AddRecord(collectionID, coll.GetFieldIndexes(), coll.GetSchema()) + collections.AddCollection(collectionID, collection210ToCollection220(coll)) + } + return collections, fieldIndexes, nil +} + +func combineToCollectionIndexesMeta220(fieldIndexes FieldIndexes210, collectionIndexes CollectionIndexesMeta210) (CollectionIndexesMeta220, error) { + indexes := make(CollectionIndexesMeta220) + for collectionID := range fieldIndexes { + record := fieldIndexes[collectionID] + if record.schema == nil { + fmt.Println("combineToCollectionIndexesMeta220, nil schema: ", collectionID, ", record: ", record) + continue + } + helper, err := typeutil.CreateSchemaHelper(record.schema) + if err != nil { + return nil, err + } + for _, index := range record.indexes { + field, err := helper.GetFieldFromID(index.GetFiledID()) + if err != nil { + return nil, err + } + indexInfo, err := collectionIndexes.GetIndex(collectionID, index.GetIndexID()) + if err != nil { + return nil, err + } + record := &model.Index{ + TenantID: "", // TODO: how to set this if we support mysql later? + CollectionID: collectionID, + FieldID: index.GetFiledID(), + IndexID: index.GetIndexID(), + IndexName: indexInfo.GetIndexName(), + IsDeleted: indexInfo.GetDeleted(), + CreateTime: indexInfo.GetCreateTime(), + TypeParams: field.GetTypeParams(), + IndexParams: indexInfo.GetIndexParams(), + } + indexes.AddRecord(collectionID, index.GetIndexID(), record) + } + } + return indexes, nil +} + +func combineToSegmentIndexesMeta220(segmentIndexes SegmentIndexesMeta210, indexBuildMeta IndexBuildMeta210) (SegmentIndexesMeta220, error) { + segmentIndexModels := make(SegmentIndexesMeta220) + for segID := range segmentIndexes { + for indexID := range segmentIndexes[segID] { + record := segmentIndexes[segID][indexID] + buildMeta, ok := indexBuildMeta[record.GetBuildID()] + if !ok { + return nil, fmt.Errorf("index build meta not found, segment id: %d, index id: %d, index build id: %d", segID, indexID, record.GetBuildID()) + } + segmentIndexModel := &model.SegmentIndex{ + SegmentID: segID, + CollectionID: record.GetCollectionID(), + PartitionID: record.GetPartitionID(), + NumRows: 0, // TODO: how to set this? + IndexID: indexID, + BuildID: record.GetBuildID(), + NodeID: buildMeta.GetNodeID(), + IndexVersion: buildMeta.GetIndexVersion(), + IndexState: buildMeta.GetState(), + FailReason: buildMeta.GetFailReason(), + IsDeleted: buildMeta.GetMarkDeleted(), + CreateTime: record.GetCreateTime(), + IndexFilePaths: buildMeta.GetIndexFilePaths(), + IndexSize: buildMeta.GetSerializeSize(), + } + segmentIndexModels.AddRecord(segID, indexID, segmentIndexModel) + } + } + return segmentIndexModels, nil +} + +func From210To220(metas *Meta) (*Meta, error) { + if !metas.Version.EQ(versions.Version210) { + return nil, fmt.Errorf("version mismatch: %s", metas.Version.String()) + } + ttAliases, err := metas.Meta210.TtAliases.to220() + if err != nil { + return nil, err + } + aliases, err := metas.Meta210.Aliases.to220() + if err != nil { + return nil, err + } + ttCollections, fieldIndexes, err := metas.Meta210.TtCollections.to220() + if err != nil { + return nil, err + } + collections, fieldIndexes2, err := metas.Meta210.Collections.to220() + if err != nil { + return nil, err + } + fieldIndexes.Merge(fieldIndexes2) + collectionIndexes, err := combineToCollectionIndexesMeta220(fieldIndexes, metas.Meta210.CollectionIndexes) + if err != nil { + return nil, err + } + segmentIndexes, err := combineToSegmentIndexesMeta220(metas.Meta210.SegmentIndexes, metas.Meta210.IndexBuildMeta) + if err != nil { + return nil, err + } + metas220 := &Meta{ + SourceVersion: metas.Version, + Version: versions.Version220, + Meta220: &All220{ + TtCollections: ttCollections, + Collections: collections, + TtAliases: ttAliases, + Aliases: aliases, + TtPartitions: make(TtPartitionsMeta220), + Partitions: make(PartitionsMeta220), + TtFields: make(TtFieldsMeta220), + Fields: make(FieldsMeta220), + CollectionIndexes: collectionIndexes, + SegmentIndexes: segmentIndexes, + }, + } + return metas220, nil +} diff --git a/cmd/tools/migration/meta/meta.go b/cmd/tools/migration/meta/meta.go new file mode 100644 index 0000000000..b3cf96fc62 --- /dev/null +++ b/cmd/tools/migration/meta/meta.go @@ -0,0 +1,17 @@ +package meta + +import ( + "github.com/blang/semver/v4" + "github.com/milvus-io/milvus/internal/util/typeutil" +) + +type UniqueID = typeutil.UniqueID +type Timestamp = typeutil.Timestamp + +type Meta struct { + SourceVersion semver.Version + Version semver.Version + + Meta210 *All210 + Meta220 *All220 +} diff --git a/cmd/tools/migration/meta/meta210.go b/cmd/tools/migration/meta/meta210.go new file mode 100644 index 0000000000..cb327630a6 --- /dev/null +++ b/cmd/tools/migration/meta/meta210.go @@ -0,0 +1,312 @@ +package meta + +import ( + "fmt" + + "github.com/milvus-io/milvus/cmd/tools/migration/legacy" + "github.com/milvus-io/milvus/cmd/tools/migration/legacy/legacypb" + + "github.com/golang/protobuf/proto" + "github.com/milvus-io/milvus/internal/metastore/kv/rootcoord" + + "github.com/milvus-io/milvus/api/schemapb" + pb "github.com/milvus-io/milvus/internal/proto/etcdpb" +) + +type FieldIndexesWithSchema struct { + indexes []*pb.FieldIndexInfo + schema *schemapb.CollectionSchema +} +type FieldIndexes210 map[UniqueID]*FieldIndexesWithSchema // coll_id -> field indexes. + +type TtCollectionsMeta210 map[UniqueID]map[Timestamp]*pb.CollectionInfo // coll_id -> ts -> coll +type CollectionsMeta210 map[UniqueID]*pb.CollectionInfo // coll_id -> coll + +type TtAliasesMeta210 map[string]map[Timestamp]*pb.CollectionInfo // alias name -> ts -> coll +type AliasesMeta210 map[string]*pb.CollectionInfo // alias name -> coll + +type CollectionIndexesMeta210 map[UniqueID]map[UniqueID]*pb.IndexInfo // coll_id -> index_id -> index +type SegmentIndexesMeta210 map[UniqueID]map[UniqueID]*pb.SegmentIndexInfo // seg_id -> index_id -> segment index + +type IndexBuildMeta210 map[UniqueID]*legacypb.IndexMeta // index_build_id -> index + +type LastDDLRecords map[string]string // We don't care this since it didn't work. + +type All210 struct { + TtAliases TtAliasesMeta210 + Aliases AliasesMeta210 + + TtCollections TtCollectionsMeta210 + Collections CollectionsMeta210 + + CollectionIndexes CollectionIndexesMeta210 + SegmentIndexes SegmentIndexesMeta210 + IndexBuildMeta IndexBuildMeta210 + + LastDDLRecords LastDDLRecords +} + +func (meta *All210) GenerateSaves() map[string]string { + ttAliases := meta.TtAliases.GenerateSaves() + aliases := meta.Aliases.GenerateSaves() + ttCollections := meta.TtCollections.GenerateSaves() + collections := meta.Collections.GenerateSaves() + collectionIndexes := meta.CollectionIndexes.GenerateSaves() + segmentIndexes := meta.SegmentIndexes.GenerateSaves() + indexBuilds := meta.IndexBuildMeta.GenerateSaves() + lastDdlRecords := meta.LastDDLRecords.GenerateSaves() + + return merge(false, + ttAliases, aliases, + ttCollections, collections, + collectionIndexes, + segmentIndexes, + indexBuilds, + lastDdlRecords) +} + +func merge(clone bool, kvs ...map[string]string) map[string]string { + if len(kvs) <= 0 { + return map[string]string{} + } + var ret map[string]string + var iterator int + if clone { + ret = make(map[string]string) + iterator = 0 + } else { + ret = kvs[0] + iterator = 1 + } + for i := iterator; i < len(kvs); i++ { + for k, v := range kvs[i] { + ret[k] = v + } + } + return ret +} + +func (meta *TtAliasesMeta210) AddAlias(alias string, info *pb.CollectionInfo, ts Timestamp) { + if _, aliasExist := (*meta)[alias]; !aliasExist { + (*meta)[alias] = map[Timestamp]*pb.CollectionInfo{ + ts: info, + } + } else { + (*meta)[alias][ts] = info + } +} + +func (meta *TtAliasesMeta210) GenerateSaves() map[string]string { + kvs := make(map[string]string) + var v []byte + var err error + for alias := range *meta { + for ts := range (*meta)[alias] { + k := rootcoord.ComposeSnapshotKey(rootcoord.SnapshotPrefix, rootcoord.BuildAliasKey210(alias), rootcoord.SnapshotsSep, ts) + record := (*meta)[alias][ts] + if record == nil { + v = rootcoord.ConstructTombstone() + } else { + v, err = proto.Marshal(record) + if err != nil { + panic(err) + } + } + kvs[k] = string(v) + } + } + return kvs +} + +func (meta *AliasesMeta210) AddAlias(alias string, info *pb.CollectionInfo) { + (*meta)[alias] = info +} + +func (meta *AliasesMeta210) GenerateSaves() map[string]string { + kvs := make(map[string]string) + var v []byte + var err error + for alias := range *meta { + record := (*meta)[alias] + k := rootcoord.BuildAliasKey210(alias) + if record == nil { + v = rootcoord.ConstructTombstone() + } else { + v, err = proto.Marshal(record) + if err != nil { + panic(err) + } + } + kvs[k] = string(v) + } + return kvs +} + +func (meta *TtCollectionsMeta210) AddCollection(collID UniqueID, coll *pb.CollectionInfo, ts Timestamp) { + if _, collExist := (*meta)[collID]; !collExist { + (*meta)[collID] = map[Timestamp]*pb.CollectionInfo{ + ts: coll, + } + } else { + (*meta)[collID][ts] = coll + } +} + +func (meta *TtCollectionsMeta210) GenerateSaves() map[string]string { + kvs := make(map[string]string) + var v []byte + var err error + for collection := range *meta { + for ts := range (*meta)[collection] { + k := rootcoord.ComposeSnapshotKey(rootcoord.SnapshotPrefix, rootcoord.BuildCollectionKey(collection), rootcoord.SnapshotsSep, ts) + record := (*meta)[collection][ts] + if record == nil { + v = rootcoord.ConstructTombstone() + } else { + v, err = proto.Marshal(record) + if err != nil { + panic(err) + } + } + kvs[k] = string(v) + } + } + return kvs +} + +func (meta *CollectionsMeta210) AddCollection(collID UniqueID, coll *pb.CollectionInfo) { + (*meta)[collID] = coll +} + +func (meta *CollectionsMeta210) GenerateSaves() map[string]string { + kvs := make(map[string]string) + var v []byte + var err error + for collection := range *meta { + record := (*meta)[collection] + k := rootcoord.BuildCollectionKey(collection) + if record == nil { + v = rootcoord.ConstructTombstone() + } else { + v, err = proto.Marshal(record) + if err != nil { + panic(err) + } + } + kvs[k] = string(v) + } + return kvs +} + +func (meta *CollectionIndexesMeta210) AddIndex(collectionID UniqueID, indexID UniqueID, index *pb.IndexInfo) { + if _, collExist := (*meta)[collectionID]; !collExist { + (*meta)[collectionID] = map[UniqueID]*pb.IndexInfo{ + indexID: index, + } + } else { + (*meta)[collectionID][indexID] = index + } +} + +func (meta *CollectionIndexesMeta210) GetIndex(collectionID UniqueID, indexID UniqueID) (*pb.IndexInfo, error) { + if _, collExist := (*meta)[collectionID]; !collExist { + return nil, fmt.Errorf("collection not exist: %d", collectionID) + } + if _, indexExist := (*meta)[collectionID][indexID]; !indexExist { + return nil, fmt.Errorf("index not exist, collection: %d, index: %d", collectionID, indexID) + } + return (*meta)[collectionID][indexID], nil +} + +func (meta *CollectionIndexesMeta210) GenerateSaves() map[string]string { + kvs := make(map[string]string) + var v []byte + var err error + for collectionID := range *meta { + for indexID := range (*meta)[collectionID] { + k := legacy.BuildCollectionIndexKey210(collectionID, indexID) + record := (*meta)[collectionID][indexID] + if record == nil { + v = rootcoord.ConstructTombstone() + } else { + v, err = proto.Marshal(record) + if err != nil { + panic(err) + } + } + kvs[k] = string(v) + } + } + return kvs +} + +func (meta *SegmentIndexesMeta210) AddIndex(segmentID UniqueID, indexID UniqueID, index *pb.SegmentIndexInfo) { + if _, segExist := (*meta)[segmentID]; !segExist { + (*meta)[segmentID] = map[UniqueID]*pb.SegmentIndexInfo{ + indexID: index, + } + } else { + (*meta)[segmentID][indexID] = index + } +} + +func (meta *SegmentIndexesMeta210) GenerateSaves() map[string]string { + kvs := make(map[string]string) + var v []byte + var err error + for segmentID := range *meta { + for indexID := range (*meta)[segmentID] { + k := legacy.BuildSegmentIndexKey210(segmentID, indexID) + record := (*meta)[segmentID][indexID] + if record == nil { + v = rootcoord.ConstructTombstone() + } else { + v, err = proto.Marshal(record) + if err != nil { + panic(err) + } + } + kvs[k] = string(v) + } + } + return kvs +} + +func (meta *IndexBuildMeta210) AddRecord(indexBuildID UniqueID, record *legacypb.IndexMeta) { + (*meta)[indexBuildID] = record +} + +func (meta *IndexBuildMeta210) GenerateSaves() map[string]string { + kvs := make(map[string]string) + var v []byte + var err error + for buildID := range *meta { + record := (*meta)[buildID] + k := legacy.BuildIndexBuildKey210(buildID) + v, err = proto.Marshal(record) + if err != nil { + panic(err) + } + + kvs[k] = string(v) + } + return kvs +} + +func (meta *FieldIndexes210) AddRecord(collectionID UniqueID, fieldIndexes []*pb.FieldIndexInfo, schema *schemapb.CollectionSchema) { + (*meta)[collectionID] = &FieldIndexesWithSchema{indexes: fieldIndexes, schema: schema} +} + +func (meta *FieldIndexes210) Merge(other FieldIndexes210) { + for collectionID := range other { + meta.AddRecord(collectionID, (other)[collectionID].indexes, (other)[collectionID].schema) + } +} + +func (meta *LastDDLRecords) AddRecord(k, v string) { + (*meta)[k] = v +} + +func (meta *LastDDLRecords) GenerateSaves() map[string]string { + return *meta +} diff --git a/cmd/tools/migration/meta/meta220.go b/cmd/tools/migration/meta/meta220.go new file mode 100644 index 0000000000..f359b99518 --- /dev/null +++ b/cmd/tools/migration/meta/meta220.go @@ -0,0 +1,248 @@ +package meta + +import ( + "github.com/blang/semver/v4" + "github.com/golang/protobuf/proto" + "github.com/milvus-io/milvus/cmd/tools/migration/versions" + "github.com/milvus-io/milvus/internal/metastore/kv/indexcoord" + "github.com/milvus-io/milvus/internal/metastore/kv/rootcoord" + "github.com/milvus-io/milvus/internal/metastore/model" +) + +type TtCollectionsMeta220 map[UniqueID]map[Timestamp]*model.Collection // coll_id -> ts -> coll +type CollectionsMeta220 map[UniqueID]*model.Collection // coll_id -> coll + +type TtAliasesMeta220 map[string]map[Timestamp]*model.Alias // alias name -> ts -> coll +type AliasesMeta220 map[string]*model.Alias // alias name -> coll + +type TtPartitionsMeta220 map[UniqueID]map[Timestamp][]*model.Partition // coll_id -> ts -> partitions +type PartitionsMeta220 map[UniqueID][]*model.Partition // coll_id -> ts -> partitions + +type TtFieldsMeta220 map[UniqueID]map[Timestamp][]*model.Field // coll_id -> ts -> fields +type FieldsMeta220 map[UniqueID][]*model.Field // coll_id -> ts -> fields + +type CollectionIndexesMeta220 map[UniqueID]map[UniqueID]*model.Index // coll_id -> index_id -> index +type SegmentIndexesMeta220 map[UniqueID]map[UniqueID]*model.SegmentIndex // seg_id -> index_id -> segment index + +func (meta *TtCollectionsMeta220) GenerateSaves(sourceVersion semver.Version) (map[string]string, error) { + saves := make(map[string]string) + + opts := make([]model.Option, 0) + if sourceVersion.LT(versions.Version220) { + opts = append(opts, model.WithFields()) + opts = append(opts, model.WithPartitions()) + } + + for collectionID := range *meta { + for ts := range (*meta)[collectionID] { + ckey := rootcoord.BuildCollectionKey(collectionID) + key := rootcoord.ComposeSnapshotKey(rootcoord.SnapshotPrefix, ckey, rootcoord.SnapshotsSep, ts) + collection := (*meta)[collectionID][ts] + var value string + if collection == nil { + // save a tombstone. + value = string(rootcoord.ConstructTombstone()) + } else { + collectionPb := model.MarshalCollectionModelWithOption(collection, opts...) + marshaledCollectionPb, err := proto.Marshal(collectionPb) + if err != nil { + return nil, err + } + value = string(marshaledCollectionPb) + } + saves[key] = value + } + } + + return saves, nil +} + +func (meta *TtCollectionsMeta220) AddCollection(collectionID UniqueID, coll *model.Collection, ts Timestamp) { + _, collExist := (*meta)[collectionID] + if collExist { + (*meta)[collectionID][ts] = coll + } else { + (*meta)[collectionID] = map[Timestamp]*model.Collection{ + ts: coll, + } + } +} + +func (meta *CollectionsMeta220) AddCollection(collectionID UniqueID, coll *model.Collection) { + (*meta)[collectionID] = coll +} + +func (meta *CollectionsMeta220) GenerateSaves(sourceVersion semver.Version) (map[string]string, error) { + saves := make(map[string]string) + + opts := make([]model.Option, 0) + if sourceVersion.LT(versions.Version220) { + opts = append(opts, model.WithFields()) + opts = append(opts, model.WithPartitions()) + } + + for collectionID := range *meta { + ckey := rootcoord.BuildCollectionKey(collectionID) + collection := (*meta)[collectionID] + var value string + if collection == nil { + // save a tombstone. + value = string(rootcoord.ConstructTombstone()) + } else { + collectionPb := model.MarshalCollectionModelWithOption(collection, opts...) + marshaledCollectionPb, err := proto.Marshal(collectionPb) + if err != nil { + return nil, err + } + value = string(marshaledCollectionPb) + } + saves[ckey] = value + } + + return saves, nil +} + +func (meta *TtAliasesMeta220) AddAlias(alias string, aliasInfo *model.Alias, ts Timestamp) { + _, aliasExist := (*meta)[alias] + if aliasExist { + (*meta)[alias][ts] = aliasInfo + } else { + (*meta)[alias] = map[Timestamp]*model.Alias{ + ts: aliasInfo, + } + } +} + +func (meta *TtAliasesMeta220) GenerateSaves() (map[string]string, error) { + saves := make(map[string]string) + + for alias := range *meta { + for ts := range (*meta)[alias] { + ckey := rootcoord.BuildAliasKey(alias) + key := rootcoord.ComposeSnapshotKey(rootcoord.SnapshotPrefix, ckey, rootcoord.SnapshotsSep, ts) + aliasInfo := (*meta)[alias][ts] + var value string + if aliasInfo == nil { + // save a tombstone. + value = string(rootcoord.ConstructTombstone()) + } else { + aliasPb := model.MarshalAliasModel(aliasInfo) + marshaledAliasPb, err := proto.Marshal(aliasPb) + if err != nil { + return nil, err + } + value = string(marshaledAliasPb) + } + saves[key] = value + } + } + + return saves, nil +} + +func (meta *AliasesMeta220) AddAlias(alias string, aliasInfo *model.Alias) { + (*meta)[alias] = aliasInfo +} + +func (meta *AliasesMeta220) GenerateSaves() (map[string]string, error) { + saves := make(map[string]string) + + for alias := range *meta { + ckey := rootcoord.BuildAliasKey(alias) + aliasInfo := (*meta)[alias] + var value string + if aliasInfo == nil { + // save a tombstone. + value = string(rootcoord.ConstructTombstone()) + } else { + aliasPb := model.MarshalAliasModel(aliasInfo) + marshaledAliasPb, err := proto.Marshal(aliasPb) + if err != nil { + return nil, err + } + value = string(marshaledAliasPb) + } + saves[ckey] = value + } + + return saves, nil +} + +func (meta *CollectionIndexesMeta220) AddRecord(collID UniqueID, indexID int64, record *model.Index) { + if _, collExist := (*meta)[collID]; !collExist { + (*meta)[collID] = map[UniqueID]*model.Index{ + indexID: record, + } + } else { + (*meta)[collID][indexID] = record + } +} + +func (meta *CollectionIndexesMeta220) GenerateSaves() (map[string]string, error) { + saves := make(map[string]string) + + for collectionID := range *meta { + for indexID := range (*meta)[collectionID] { + ckey := indexcoord.BuildIndexKey(collectionID, indexID) + index := (*meta)[collectionID][indexID] + var value string + indexPb := model.MarshalIndexModel(index) + marshaledIndexPb, err := proto.Marshal(indexPb) + if err != nil { + return nil, err + } + value = string(marshaledIndexPb) + saves[ckey] = value + } + } + + return saves, nil +} + +func (meta *SegmentIndexesMeta220) GenerateSaves() (map[string]string, error) { + saves := make(map[string]string) + + for segmentID := range *meta { + for indexID := range (*meta)[segmentID] { + index := (*meta)[segmentID][indexID] + ckey := indexcoord.BuildSegmentIndexKey(index.CollectionID, index.PartitionID, index.SegmentID, index.BuildID) + var value string + indexPb := model.MarshalSegmentIndexModel(index) + marshaledIndexPb, err := proto.Marshal(indexPb) + if err != nil { + return nil, err + } + value = string(marshaledIndexPb) + saves[ckey] = value + } + } + + return saves, nil +} + +func (meta *SegmentIndexesMeta220) AddRecord(segID UniqueID, indexID UniqueID, record *model.SegmentIndex) { + if _, segExist := (*meta)[segID]; !segExist { + (*meta)[segID] = map[UniqueID]*model.SegmentIndex{ + indexID: record, + } + } else { + (*meta)[segID][indexID] = record + } +} + +type All220 struct { + TtCollections TtCollectionsMeta220 + Collections CollectionsMeta220 + + TtAliases TtAliasesMeta220 + Aliases AliasesMeta220 + + TtPartitions TtPartitionsMeta220 + Partitions PartitionsMeta220 + + TtFields TtFieldsMeta220 + Fields FieldsMeta220 + + CollectionIndexes CollectionIndexesMeta220 + SegmentIndexes SegmentIndexesMeta220 +} diff --git a/cmd/tools/migration/migration/210_to_220.go b/cmd/tools/migration/migration/210_to_220.go new file mode 100644 index 0000000000..edb28f2673 --- /dev/null +++ b/cmd/tools/migration/migration/210_to_220.go @@ -0,0 +1,16 @@ +package migration + +import ( + "github.com/milvus-io/milvus/cmd/tools/migration/meta" +) + +type migrator210To220 struct { +} + +func (m migrator210To220) Migrate(metas *meta.Meta) (*meta.Meta, error) { + return meta.From210To220(metas) +} + +func newMigrator210To220() *migrator210To220 { + return &migrator210To220{} +} diff --git a/cmd/tools/migration/migration/constant.go b/cmd/tools/migration/migration/constant.go new file mode 100644 index 0000000000..8463fa261c --- /dev/null +++ b/cmd/tools/migration/migration/constant.go @@ -0,0 +1,18 @@ +package migration + +import "github.com/milvus-io/milvus/internal/util/typeutil" + +const ( + Role = "migration" +) + +var Roles = []string{ + typeutil.RootCoordRole, + typeutil.IndexCoordRole, + typeutil.IndexNodeRole, + typeutil.DataCoordRole, + typeutil.DataNodeRole, + typeutil.QueryCoordRole, + typeutil.QueryNodeRole, + typeutil.ProxyRole, +} diff --git a/cmd/tools/migration/migration/migration.go b/cmd/tools/migration/migration/migration.go new file mode 100644 index 0000000000..2698ef99ce --- /dev/null +++ b/cmd/tools/migration/migration/migration.go @@ -0,0 +1,21 @@ +package migration + +type Migration interface { + // Validate if migration can be executed. For example, higher version to lower version is not allowed. + Validate() error + // CheckCompatible check if target is compatible with source. If compatible, no migration should be executed. + CheckCompatible() bool + // CheckSessions check if any sessions are alive. Abort migration if any. + CheckSessions() error + // RegisterSession register session to avoid any other migration is also running, registered session will be deleted + // as soon as possible after migration is done. + RegisterSession() error + // Backup source meta information. + Backup() error + // Migrate to target backend. + Migrate() error + // Rollback migration. + Rollback() error + // Stop complete the migration overflow. + Stop() +} diff --git a/cmd/tools/migration/migration/migrator.go b/cmd/tools/migration/migration/migrator.go new file mode 100644 index 0000000000..e220f71dca --- /dev/null +++ b/cmd/tools/migration/migration/migrator.go @@ -0,0 +1,32 @@ +package migration + +import ( + "fmt" + + "github.com/blang/semver/v4" + "github.com/milvus-io/milvus/cmd/tools/migration/meta" + "github.com/milvus-io/milvus/cmd/tools/migration/versions" +) + +type Migrator interface { + Migrate(metas *meta.Meta) (*meta.Meta, error) +} + +func NewMigrator(sourceVersion, targetVersion string) (Migrator, error) { + source, err := semver.Parse(sourceVersion) + if err != nil { + return nil, err + } + + target, err := semver.Parse(targetVersion) + if err != nil { + return nil, err + } + + if versions.Range21x(source) && versions.Range22x(target) { + return newMigrator210To220(), nil + } + + return nil, fmt.Errorf("migration from source version to target version is forbidden, source: %s, target: %s", + sourceVersion, targetVersion) +} diff --git a/cmd/tools/migration/migration/runner.go b/cmd/tools/migration/migration/runner.go new file mode 100644 index 0000000000..5513845dad --- /dev/null +++ b/cmd/tools/migration/migration/runner.go @@ -0,0 +1,214 @@ +package migration + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/milvus-io/milvus/cmd/tools/migration/versions" + + "github.com/blang/semver/v4" + + "github.com/milvus-io/milvus/cmd/tools/migration/configs" + + "github.com/milvus-io/milvus/cmd/tools/migration/console" + + "github.com/milvus-io/milvus/cmd/tools/migration/backend" + clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/milvus-io/milvus/internal/util/etcd" + "github.com/milvus-io/milvus/internal/util/sessionutil" +) + +type Runner struct { + ctx context.Context + cancel context.CancelFunc + cfg *configs.Config + initOnce sync.Once + session *sessionutil.Session + address string + etcdCli *clientv3.Client + wg sync.WaitGroup +} + +func NewRunner(ctx context.Context, cfg *configs.Config) *Runner { + ctx1, cancel := context.WithCancel(ctx) + runner := &Runner{ctx: ctx1, cancel: cancel, cfg: cfg} + runner.initOnce.Do(runner.init) + return runner +} + +func (r *Runner) watchByPrefix(prefix string) { + defer r.wg.Done() + _, revision, err := r.session.GetSessions(prefix) + console.ExitIf(err) + eventCh := r.session.WatchServices(prefix, revision, nil) + for { + select { + case <-r.ctx.Done(): + return + case event := <-eventCh: + msg := fmt.Sprintf("session up/down, exit migration, event type: %s, session: %s", event.EventType.String(), event.Session.String()) + console.Exit(msg) + } + } +} + +func (r *Runner) WatchSessions() { + for _, role := range Roles { + r.wg.Add(1) + go r.watchByPrefix(role) + } +} + +func (r *Runner) initEtcdCli() { + cli, err := etcd.GetEtcdClient(r.cfg.EtcdCfg) + console.ExitIf(err) + r.etcdCli = cli +} + +func (r *Runner) init() { + r.initEtcdCli() + + r.session = sessionutil.NewSession(r.ctx, r.cfg.EtcdCfg.MetaRootPath, r.etcdCli) + // address not important here. + address := time.Now().String() + r.address = address + r.session.Init(Role, address, true, true) + r.WatchSessions() +} + +func (r *Runner) Validate() error { + source, err := semver.Parse(r.cfg.SourceVersion) + if err != nil { + return err + } + target, err := semver.Parse(r.cfg.TargetVersion) + if err != nil { + return err + } + can := target.GTE(source) + if can { + return nil + } + return fmt.Errorf("higher version to lower version is not allowed, "+ + "source version: %s, target version: %s", source.String(), target.String()) +} + +func (r *Runner) CheckCompatible() bool { + // TODO: more accurate. + target, err := semver.Parse(r.cfg.TargetVersion) + if err != nil { + return false + } + return target.LT(versions.Version220) +} + +func (r *Runner) checkSessionsWithPrefix(prefix string) error { + sessions, _, err := r.session.GetSessions(prefix) + if err != nil { + return err + } + if len(sessions) > 0 { + return fmt.Errorf("there are still sessions alive, prefix: %s, num of alive sessions: %d", prefix, len(sessions)) + } + return nil +} + +func (r *Runner) checkMySelf() error { + sessions, _, err := r.session.GetSessions(Role) + if err != nil { + return err + } + for _, session := range sessions { + if session.Address != r.address { + return fmt.Errorf("other migration is running") + } + } + return nil +} + +func (r *Runner) CheckSessions() error { + if err := r.checkMySelf(); err != nil { + return err + } + for _, prefix := range Roles { + if err := r.checkSessionsWithPrefix(prefix); err != nil { + return err + } + } + return nil +} + +func (r *Runner) RegisterSession() error { + r.session.Register() + go r.session.LivenessCheck(r.ctx, func() {}) + return nil +} + +func (r *Runner) Backup() error { + source, err := backend.NewBackend(r.cfg.MilvusConfig, r.cfg.SourceVersion) + if err != nil { + return err + } + metas, err := source.Load() + if err != nil { + return err + } + return source.Backup(metas, r.cfg.BackupFilePath) +} + +func (r *Runner) Rollback() error { + source, err := backend.NewBackend(r.cfg.MilvusConfig, r.cfg.SourceVersion) + if err != nil { + return err + } + target, err := backend.NewBackend(r.cfg.MilvusConfig, r.cfg.TargetVersion) + if err != nil { + return err + } + if err := source.Clean(); err != nil { + return err + } + if err := target.Clean(); err != nil { + return err + } + return source.Restore(r.cfg.BackupFilePath) +} + +func (r *Runner) Migrate() error { + migrator, err := NewMigrator(r.cfg.SourceVersion, r.cfg.TargetVersion) + if err != nil { + return err + } + source, err := backend.NewBackend(r.cfg.MilvusConfig, r.cfg.SourceVersion) + if err != nil { + return err + } + metas, err := source.Load() + if err != nil { + return err + } + if err := source.Backup(metas, r.cfg.BackupFilePath); err != nil { + return err + } + if err := source.Clean(); err != nil { + return err + } + targetMetas, err := migrator.Migrate(metas) + if err != nil { + return err + } + target, err := backend.NewBackend(r.cfg.MilvusConfig, r.cfg.TargetVersion) + if err != nil { + return err + } + return target.Save(targetMetas) +} + +func (r *Runner) Stop() { + r.session.Revoke(time.Second) + r.cancel() + r.wg.Wait() +} diff --git a/cmd/tools/migration/utils/util.go b/cmd/tools/migration/utils/util.go new file mode 100644 index 0000000000..e5feb7264a --- /dev/null +++ b/cmd/tools/migration/utils/util.go @@ -0,0 +1,49 @@ +package utils + +import ( + "fmt" + "strconv" + "strings" + + "github.com/milvus-io/milvus/internal/util/typeutil" + + "github.com/milvus-io/milvus/internal/metastore/kv/rootcoord" +) + +type UniqueID = typeutil.UniqueID +type Timestamp = typeutil.Timestamp + +type errNotOfTsKey struct { + key string +} + +func (e errNotOfTsKey) Error() string { + return fmt.Sprintf("%s is not of snapshot", e.key) +} + +func NewErrNotOfTsKey(key string) *errNotOfTsKey { + return &errNotOfTsKey{key: key} +} + +func IsErrNotOfTsKey(err error) bool { + _, ok := err.(*errNotOfTsKey) + return ok +} + +func SplitBySeparator(s string) (key string, ts Timestamp, err error) { + got := strings.Split(s, rootcoord.SnapshotsSep) + if len(got) != 2 { + return "", 0, NewErrNotOfTsKey(s) + } + convertedTs, err := strconv.Atoi(got[1]) + if err != nil { + return "", 0, fmt.Errorf("%s is not of snapshot", s) + } + return got[0], Timestamp(convertedTs), nil +} + +func GetFileName(p string) string { + got := strings.Split(p, "/") + l := len(got) + return got[l-1] +} diff --git a/cmd/tools/migration/utils/util_test.go b/cmd/tools/migration/utils/util_test.go new file mode 100644 index 0000000000..fd1233c31c --- /dev/null +++ b/cmd/tools/migration/utils/util_test.go @@ -0,0 +1,40 @@ +package utils + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSplitBySeparator(t *testing.T) { + tsKey := "435783141193354561_ts435783141193154564" + k, ts, err := SplitBySeparator(tsKey) + assert.NoError(t, err) + assert.Equal(t, "435783141193354561", k) + assert.Equal(t, Timestamp(435783141193154564), ts) +} + +func TestGetFileName(t *testing.T) { + type args struct { + p string + } + tests := []struct { + name string + args args + want string + }{ + { + args: args{p: "snapshots/root-coord/collection/436611447439428929_ts436611447439228933"}, + want: "436611447439428929_ts436611447439228933", + }, + { + args: args{p: "root-coord/collection/436611447439428929"}, + want: "436611447439428929", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equalf(t, tt.want, GetFileName(tt.args.p), "GetFileName(%v)", tt.args.p) + }) + } +} diff --git a/cmd/tools/migration/versions/version.go b/cmd/tools/migration/versions/version.go new file mode 100644 index 0000000000..65162b8983 --- /dev/null +++ b/cmd/tools/migration/versions/version.go @@ -0,0 +1,32 @@ +package versions + +import "github.com/blang/semver/v4" + +const ( + version210Str = "2.1.0" + version220Str = "2.2.0" + version230Str = "2.3.0" + VersionMaxStr = "1000.1000.1000" +) + +var ( + Version220 semver.Version + Version210 semver.Version + Version230 semver.Version + VersionMax semver.Version +) + +func init() { + Version210, _ = semver.Parse(version210Str) + Version220, _ = semver.Parse(version220Str) + Version230, _ = semver.Parse(version230Str) + VersionMax, _ = semver.Parse(VersionMaxStr) +} + +func Range21x(version semver.Version) bool { + return version.GTE(Version210) && version.LT(Version220) +} + +func Range22x(version semver.Version) bool { + return version.GTE(Version220) && version.LT(Version230) +} diff --git a/cmd/tools/migration/versions/version_test.go b/cmd/tools/migration/versions/version_test.go new file mode 100644 index 0000000000..6e9c508b1f --- /dev/null +++ b/cmd/tools/migration/versions/version_test.go @@ -0,0 +1,77 @@ +package versions + +import ( + "testing" + + "github.com/blang/semver/v4" +) + +func TestRange21x(t *testing.T) { + type args struct { + version semver.Version + } + tests := []struct { + name string + args args + want bool + }{ + { + args: args{version: VersionMax}, + want: false, + }, + { + args: args{version: Version230}, + want: false, + }, + { + args: args{version: Version220}, + want: false, + }, + { + args: args{version: Version210}, + want: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := Range21x(tt.args.version); got != tt.want { + t.Errorf("Range21x() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestRange22x(t *testing.T) { + type args struct { + version semver.Version + } + tests := []struct { + name string + args args + want bool + }{ + { + args: args{version: VersionMax}, + want: false, + }, + { + args: args{version: Version230}, + want: false, + }, + { + args: args{version: Version220}, + want: true, + }, + { + args: args{version: Version210}, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := Range22x(tt.args.version); got != tt.want { + t.Errorf("Range22x() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/go.mod b/go.mod index 9c6d084e00..d59c6d8580 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.18 require ( github.com/99designs/keyring v1.2.1 // indirect github.com/BurntSushi/toml v1.0.0 - github.com/DATA-DOG/go-sqlmock v1.5.0 // indirect + github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect github.com/StackExchange/wmi v1.2.1 // indirect github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210826220005-b48c857c3a0e @@ -20,7 +20,6 @@ require ( github.com/gin-gonic/gin v1.7.7 github.com/go-basic/ipv4 v1.0.0 github.com/gofrs/flock v0.8.1 - github.com/golang/mock v1.5.0 github.com/golang/protobuf v1.5.2 github.com/google/btree v1.0.1 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 @@ -61,9 +60,14 @@ require ( stathat.com/c/consistent v1.0.0 ) -require github.com/apache/thrift v0.15.0 +require ( + github.com/apache/thrift v0.15.0 // indirect + github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d +) -require github.com/sandertv/go-formula/v2 v2.0.0-alpha.7 // indirect +require github.com/sandertv/go-formula/v2 v2.0.0-alpha.7 + +require github.com/quasilyte/go-ruleguard/dsl v0.3.21 // indirect require ( github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect @@ -141,7 +145,6 @@ require ( github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.26.0 // indirect github.com/prometheus/procfs v0.6.0 // indirect - github.com/quasilyte/go-ruleguard/dsl v0.3.21 // indirect github.com/rs/xid v1.2.1 // indirect github.com/samber/lo v1.27.0 github.com/sirupsen/logrus v1.8.1 // indirect @@ -202,8 +205,6 @@ replace ( github.com/tecbot/gorocksdb => github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b // indirect ) -replace ( - // If you want to use the hook interceptor, the following code should be commented out - // and you should modify the api version to be the same as the `so` project. - github.com/milvus-io/milvus/api => ./api -) +// If you want to use the hook interceptor, the following code should be commented out +// and you should modify the api version to be the same as the `so` project. +replace github.com/milvus-io/milvus/api => ./api diff --git a/go.sum b/go.sum index 5611db7dc6..a2d774d42d 100644 --- a/go.sum +++ b/go.sum @@ -463,7 +463,6 @@ github.com/kris-nova/lolgopher v0.0.0-20180921204813-313b3abb0d9b h1:xYEM2oBUhBE github.com/kris-nova/lolgopher v0.0.0-20180921204813-313b3abb0d9b/go.mod h1:V0HF/ZBlN86HqewcDC/cVxMmYDiRukWjSrgKLUAn9Js= github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y= github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= -github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76 h1:IVlcvV0CjvfBYYod5ePe89l+3LBAl//6n9kJ9Vr2i0k= github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76/go.mod h1:Iu9BHUvTh8/KpbuSoKx/CaJEdJvFxSverxIy7I+nq7s= github.com/linkedin/goavro v2.1.0+incompatible/go.mod h1:bBCwI2eGYpUI/4820s67MElg9tdeLbINjLjiM2xZFYM= @@ -486,14 +485,13 @@ github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Ky github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/mattn/go-runewidth v0.0.8 h1:3tS41NlGYSmhhe/8fhGRzc+z3AYCw1Fe1WAyLuujKs0= github.com/mattn/go-runewidth v0.0.8/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= -github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d h1:5PJl274Y63IEHC+7izoQE9x6ikvDFZS2mDVS3drnohI= +github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= -github.com/milvus-io/milvus/api v0.0.0-20220915082433-b1f4c60117bb h1:vKuSxdR+egH8P7QOgO/WP0c2JpLnGPpt1DF6bSE6tmQ= -github.com/milvus-io/milvus/api v0.0.0-20220915082433-b1f4c60117bb/go.mod h1:ZhlXJ8rZ90MBqJZNniyzcT8dibpZ8wRW2c1Tcb8/6oo= github.com/milvus-io/pulsar-client-go v0.6.8 h1:fZdZH73aPRszu2fazyeeahQEz34tyn1Pt9EkqJmV100= github.com/milvus-io/pulsar-client-go v0.6.8/go.mod h1:oFIlYIk23tamkSLttw849qphmMIpHY8ztEBWDWJW+sc= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= @@ -502,8 +500,6 @@ github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8D github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= github.com/minio/md5-simd v1.1.0 h1:QPfiOqlZH+Cj9teu0t9b1nTBfPbyTl16Of5MeuShdK4= github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw= -github.com/minio/minio-go/v7 v7.0.10 h1:1oUKe4EOPUEhw2qnPQaPsJ0lmVTYLFu03SiItauXs94= -github.com/minio/minio-go/v7 v7.0.10/go.mod h1:td4gW1ldOsj1PbSNS+WYK43j+P1XVhX/8W8awaYlBFo= github.com/minio/minio-go/v7 v7.0.17 h1:5SiS3pqiQDbNhmXMxtqn2HzAInbN5cbHT7ip9F0F07E= github.com/minio/minio-go/v7 v7.0.17/go.mod h1:SyQ1IFeJuaa+eV5yEDxW7hYE1s5VVq5sgImDe27R+zg= github.com/minio/sha256-simd v0.1.1 h1:5QHSlgo3nt5yKOJrC7W8w7X+NFl8cMPZm96iu8kKUJU= @@ -676,6 +672,7 @@ github.com/stretchr/testify v1.7.4 h1:wZRexSlwd7ZXfKINDLsO4r7WBt3gTKONc6K/VesHvH github.com/stretchr/testify v1.7.4/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= +github.com/thoas/go-funk v0.9.1 h1:O549iLZqPpTUQ10ykd26sZhzD+rmR5pWhuElrhbC20M= github.com/tklauser/go-sysconf v0.3.9 h1:JeUVdAOWhhxVcU6Eqr/ATFHgXk/mmiItdKeJPev3vTo= github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs= github.com/tklauser/numcpus v0.3.0 h1:ILuRUQBtssgnxw0XXIjKUC56fgnOrFoQQ/4+DeU2biQ= @@ -775,7 +772,6 @@ golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20200709230013-948cd5f35899/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201216223049-8b5274cf687f/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg= @@ -795,7 +791,6 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20211216164055-b2b84827b756 h1:/5Bs7sWi0i3rOVO5KnM55OwugpsD4bRW1zywKoZjbkI= golang.org/x/exp v0.0.0-20211216164055-b2b84827b756/go.mod h1:b9TAUYHmRtqA6klRHApnXMnj+OyLce4yF5cZCUbk2ps= golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM= golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE= diff --git a/internal/metastore/kv/indexcoord/kv_catalog.go b/internal/metastore/kv/indexcoord/kv_catalog.go index d259d3ed14..f57752c1ed 100644 --- a/internal/metastore/kv/indexcoord/kv_catalog.go +++ b/internal/metastore/kv/indexcoord/kv_catalog.go @@ -21,16 +21,16 @@ type Catalog struct { Txn kv.TxnKV } -func buildIndexKey(collectionID, indexID int64) string { +func BuildIndexKey(collectionID, indexID int64) string { return fmt.Sprintf("%s/%d/%d", util.FieldIndexPrefix, collectionID, indexID) } -func buildSegmentIndexKey(collectionID, partitionID, segmentID, buildID int64) string { +func BuildSegmentIndexKey(collectionID, partitionID, segmentID, buildID int64) string { return fmt.Sprintf("%s/%d/%d/%d/%d", util.SegmentIndexPrefix, collectionID, partitionID, segmentID, buildID) } func (kc *Catalog) CreateIndex(ctx context.Context, index *model.Index) error { - key := buildIndexKey(index.CollectionID, index.IndexID) + key := BuildIndexKey(index.CollectionID, index.IndexID) value, err := proto.Marshal(model.MarshalIndexModel(index)) if err != nil { @@ -73,7 +73,7 @@ func (kc *Catalog) AlterIndex(ctx context.Context, index *model.Index) error { func (kc *Catalog) AlterIndexes(ctx context.Context, indexes []*model.Index) error { kvs := make(map[string]string) for _, index := range indexes { - key := buildIndexKey(index.CollectionID, index.IndexID) + key := BuildIndexKey(index.CollectionID, index.IndexID) value, err := proto.Marshal(model.MarshalIndexModel(index)) if err != nil { @@ -86,7 +86,7 @@ func (kc *Catalog) AlterIndexes(ctx context.Context, indexes []*model.Index) err } func (kc *Catalog) DropIndex(ctx context.Context, collID typeutil.UniqueID, dropIdxID typeutil.UniqueID) error { - key := buildIndexKey(collID, dropIdxID) + key := BuildIndexKey(collID, dropIdxID) err := kc.Txn.Remove(key) if err != nil { @@ -99,7 +99,7 @@ func (kc *Catalog) DropIndex(ctx context.Context, collID typeutil.UniqueID, drop } func (kc *Catalog) CreateSegmentIndex(ctx context.Context, segIdx *model.SegmentIndex) error { - key := buildSegmentIndexKey(segIdx.CollectionID, segIdx.PartitionID, segIdx.SegmentID, segIdx.BuildID) + key := BuildSegmentIndexKey(segIdx.CollectionID, segIdx.PartitionID, segIdx.SegmentID, segIdx.BuildID) value, err := proto.Marshal(model.MarshalSegmentIndexModel(segIdx)) if err != nil { @@ -143,7 +143,7 @@ func (kc *Catalog) AlterSegmentIndex(ctx context.Context, segIdx *model.SegmentI func (kc *Catalog) AlterSegmentIndexes(ctx context.Context, segIdxes []*model.SegmentIndex) error { kvs := make(map[string]string) for _, segIdx := range segIdxes { - key := buildSegmentIndexKey(segIdx.CollectionID, segIdx.PartitionID, segIdx.SegmentID, segIdx.BuildID) + key := BuildSegmentIndexKey(segIdx.CollectionID, segIdx.PartitionID, segIdx.SegmentID, segIdx.BuildID) value, err := proto.Marshal(model.MarshalSegmentIndexModel(segIdx)) if err != nil { return err @@ -154,7 +154,7 @@ func (kc *Catalog) AlterSegmentIndexes(ctx context.Context, segIdxes []*model.Se } func (kc *Catalog) DropSegmentIndex(ctx context.Context, collID, partID, segID, buildID typeutil.UniqueID) error { - key := buildSegmentIndexKey(collID, partID, segID, buildID) + key := BuildSegmentIndexKey(collID, partID, segID, buildID) err := kc.Txn.Remove(key) if err != nil { diff --git a/internal/metastore/kv/rootcoord/kv_catalog.go b/internal/metastore/kv/rootcoord/kv_catalog.go index 5b8b44e7c8..0b66a5356a 100644 --- a/internal/metastore/kv/rootcoord/kv_catalog.go +++ b/internal/metastore/kv/rootcoord/kv_catalog.go @@ -40,27 +40,31 @@ type Catalog struct { Snapshot kv.SnapShotKV } -func buildCollectionKey(collectionID typeutil.UniqueID) string { +func BuildCollectionKey(collectionID typeutil.UniqueID) string { return fmt.Sprintf("%s/%d", CollectionMetaPrefix, collectionID) } -func buildPartitionPrefix(collectionID typeutil.UniqueID) string { +func BuildPartitionPrefix(collectionID typeutil.UniqueID) string { return fmt.Sprintf("%s/%d", PartitionMetaPrefix, collectionID) } -func buildPartitionKey(collectionID, partitionID typeutil.UniqueID) string { - return fmt.Sprintf("%s/%d", buildPartitionPrefix(collectionID), partitionID) +func BuildPartitionKey(collectionID, partitionID typeutil.UniqueID) string { + return fmt.Sprintf("%s/%d", BuildPartitionPrefix(collectionID), partitionID) } -func buildFieldPrefix(collectionID typeutil.UniqueID) string { +func BuildFieldPrefix(collectionID typeutil.UniqueID) string { return fmt.Sprintf("%s/%d", FieldMetaPrefix, collectionID) } -func buildFieldKey(collectionID typeutil.UniqueID, fieldID int64) string { - return fmt.Sprintf("%s/%d", buildFieldPrefix(collectionID), fieldID) +func BuildFieldKey(collectionID typeutil.UniqueID, fieldID int64) string { + return fmt.Sprintf("%s/%d", BuildFieldPrefix(collectionID), fieldID) } -func buildAliasKey(aliasName string) string { +func BuildAliasKey210(alias string) string { + return fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix210, alias) +} + +func BuildAliasKey(aliasName string) string { return fmt.Sprintf("%s/%s", AliasMetaPrefix, aliasName) } @@ -83,7 +87,7 @@ func (kc *Catalog) CreateCollection(ctx context.Context, coll *model.Collection, return fmt.Errorf("cannot create collection with state: %s, collection: %s", coll.State.String(), coll.Name) } - k1 := buildCollectionKey(coll.CollectionID) + k1 := BuildCollectionKey(coll.CollectionID) collInfo := model.MarshalCollectionModel(coll) v1, err := proto.Marshal(collInfo) if err != nil { @@ -104,7 +108,7 @@ func (kc *Catalog) CreateCollection(ctx context.Context, coll *model.Collection, // save partition info to newly path. for _, partition := range coll.Partitions { - k := buildPartitionKey(coll.CollectionID, partition.PartitionID) + k := BuildPartitionKey(coll.CollectionID, partition.PartitionID) partitionInfo := model.MarshalPartitionModel(partition) v, err := proto.Marshal(partitionInfo) if err != nil { @@ -117,7 +121,7 @@ func (kc *Catalog) CreateCollection(ctx context.Context, coll *model.Collection, // save fields info to newly path. for _, field := range coll.Fields { - k := buildFieldKey(coll.CollectionID, field.FieldID) + k := BuildFieldKey(coll.CollectionID, field.FieldID) fieldInfo := model.MarshalFieldModel(field) v, err := proto.Marshal(fieldInfo) if err != nil { @@ -134,7 +138,7 @@ func (kc *Catalog) CreateCollection(ctx context.Context, coll *model.Collection, } func (kc *Catalog) loadCollection(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*pb.CollectionInfo, error) { - collKey := buildCollectionKey(collectionID) + collKey := BuildCollectionKey(collectionID) collVal, err := kc.Snapshot.Load(collKey, ts) if err != nil { return nil, common.NewCollectionNotExistError(fmt.Sprintf("collection not found: %d", collectionID)) @@ -167,7 +171,7 @@ func (kc *Catalog) CreatePartition(ctx context.Context, partition *model.Partiti if partitionVersionAfter210(collMeta) { // save to newly path. - k := buildPartitionKey(partition.CollectionID, partition.PartitionID) + k := BuildPartitionKey(partition.CollectionID, partition.PartitionID) partitionInfo := model.MarshalPartitionModel(partition) v, err := proto.Marshal(partitionInfo) if err != nil { @@ -189,7 +193,7 @@ func (kc *Catalog) CreatePartition(ctx context.Context, partition *model.Partiti collMeta.PartitionNames = append(collMeta.PartitionNames, partition.PartitionName) collMeta.PartitionCreatedTimestamps = append(collMeta.PartitionCreatedTimestamps, partition.PartitionCreatedTimestamp) - k := buildCollectionKey(partition.CollectionID) + k := BuildCollectionKey(partition.CollectionID) v, err := proto.Marshal(collMeta) if err != nil { return err @@ -198,8 +202,8 @@ func (kc *Catalog) CreatePartition(ctx context.Context, partition *model.Partiti } func (kc *Catalog) CreateAlias(ctx context.Context, alias *model.Alias, ts typeutil.Timestamp) error { - oldKBefore210 := fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, alias.Name) - k := buildAliasKey(alias.Name) + oldKBefore210 := BuildAliasKey210(alias.Name) + k := BuildAliasKey(alias.Name) aliasInfo := model.MarshalAliasModel(alias) v, err := proto.Marshal(aliasInfo) if err != nil { @@ -231,7 +235,7 @@ func (kc *Catalog) AlterCredential(ctx context.Context, credential *model.Creden } func (kc *Catalog) listPartitionsAfter210(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) ([]*model.Partition, error) { - prefix := buildPartitionPrefix(collectionID) + prefix := BuildPartitionPrefix(collectionID) _, values, err := kc.Snapshot.LoadWithPrefix(prefix, ts) if err != nil { return nil, err @@ -253,7 +257,7 @@ func fieldVersionAfter210(collMeta *pb.CollectionInfo) bool { } func (kc *Catalog) listFieldsAfter210(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) ([]*model.Field, error) { - prefix := buildFieldPrefix(collectionID) + prefix := BuildFieldPrefix(collectionID) _, values, err := kc.Snapshot.LoadWithPrefix(prefix, ts) if err != nil { return nil, err @@ -324,22 +328,22 @@ func (kc *Catalog) AlterAlias(ctx context.Context, alias *model.Alias, ts typeut } func (kc *Catalog) DropCollection(ctx context.Context, collectionInfo *model.Collection, ts typeutil.Timestamp) error { - collectionKey := buildCollectionKey(collectionInfo.CollectionID) + collectionKey := BuildCollectionKey(collectionInfo.CollectionID) var delMetakeysSnap []string for _, alias := range collectionInfo.Aliases { delMetakeysSnap = append(delMetakeysSnap, - fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, alias), + BuildAliasKey210(alias), ) } // Snapshot will list all (k, v) pairs and then use Txn.MultiSave to save tombstone for these keys when it prepares // to remove a prefix, so though we have very few prefixes, the final operations may exceed the max txn number. // TODO(longjiquan): should we list all partitions & fields in KV anyway? for _, partition := range collectionInfo.Partitions { - delMetakeysSnap = append(delMetakeysSnap, buildPartitionKey(collectionInfo.CollectionID, partition.PartitionID)) + delMetakeysSnap = append(delMetakeysSnap, BuildPartitionKey(collectionInfo.CollectionID, partition.PartitionID)) } for _, field := range collectionInfo.Fields { - delMetakeysSnap = append(delMetakeysSnap, buildFieldKey(collectionInfo.CollectionID, field.FieldID)) + delMetakeysSnap = append(delMetakeysSnap, BuildFieldKey(collectionInfo.CollectionID, field.FieldID)) } // delMetakeysSnap = append(delMetakeysSnap, buildPartitionPrefix(collectionInfo.CollectionID)) // delMetakeysSnap = append(delMetakeysSnap, buildFieldPrefix(collectionInfo.CollectionID)) @@ -370,7 +374,7 @@ func (kc *Catalog) alterModifyCollection(oldColl *model.Collection, newColl *mod oldCollClone.CreateTime = newColl.CreateTime oldCollClone.ConsistencyLevel = newColl.ConsistencyLevel oldCollClone.State = newColl.State - key := buildCollectionKey(oldColl.CollectionID) + key := BuildCollectionKey(oldColl.CollectionID) value, err := proto.Marshal(model.MarshalCollectionModel(oldCollClone)) if err != nil { return err @@ -394,7 +398,7 @@ func (kc *Catalog) alterModifyPartition(oldPart *model.Partition, newPart *model oldPartClone.PartitionName = newPartClone.PartitionName oldPartClone.PartitionCreatedTimestamp = newPartClone.PartitionCreatedTimestamp oldPartClone.State = newPartClone.State - key := buildPartitionKey(oldPart.CollectionID, oldPart.PartitionID) + key := BuildPartitionKey(oldPart.CollectionID, oldPart.PartitionID) value, err := proto.Marshal(model.MarshalPartitionModel(oldPartClone)) if err != nil { return err @@ -437,11 +441,11 @@ func (kc *Catalog) DropPartition(ctx context.Context, collectionID typeutil.Uniq } if partitionVersionAfter210(collMeta) { - k := buildPartitionKey(collectionID, partitionID) + k := BuildPartitionKey(collectionID, partitionID) return kc.Snapshot.MultiSaveAndRemoveWithPrefix(nil, []string{k}, ts) } - k := buildCollectionKey(collectionID) + k := BuildCollectionKey(collectionID) dropPartition(collMeta, partitionID) v, err := proto.Marshal(collMeta) if err != nil { @@ -462,8 +466,8 @@ func (kc *Catalog) DropCredential(ctx context.Context, username string) error { } func (kc *Catalog) DropAlias(ctx context.Context, alias string, ts typeutil.Timestamp) error { - oldKBefore210 := fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, alias) - k := buildAliasKey(alias) + oldKBefore210 := BuildAliasKey210(alias) + k := BuildAliasKey(alias) return kc.Snapshot.MultiSaveAndRemoveWithPrefix(nil, []string{k, oldKBefore210}, ts) } @@ -519,7 +523,7 @@ func (kc *Catalog) ListCollections(ctx context.Context, ts typeutil.Timestamp) ( } func (kc *Catalog) listAliasesBefore210(ctx context.Context, ts typeutil.Timestamp) ([]*model.Alias, error) { - _, values, err := kc.Snapshot.LoadWithPrefix(CollectionAliasMetaPrefix, ts) + _, values, err := kc.Snapshot.LoadWithPrefix(CollectionAliasMetaPrefix210, ts) if err != nil { return nil, err } diff --git a/internal/metastore/kv/rootcoord/kv_catalog_test.go b/internal/metastore/kv/rootcoord/kv_catalog_test.go index c9fa9f0471..32b45d9a86 100644 --- a/internal/metastore/kv/rootcoord/kv_catalog_test.go +++ b/internal/metastore/kv/rootcoord/kv_catalog_test.go @@ -770,7 +770,7 @@ func TestCatalog_AlterCollection(t *testing.T) { newC := &model.Collection{CollectionID: collectionID, State: pb.CollectionState_CollectionCreated} err := kc.AlterCollection(ctx, oldC, newC, metastore.MODIFY, 0) assert.NoError(t, err) - key := buildCollectionKey(collectionID) + key := BuildCollectionKey(collectionID) value, ok := kvs[key] assert.True(t, ok) var collPb pb.CollectionInfo @@ -821,7 +821,7 @@ func TestCatalog_AlterPartition(t *testing.T) { newP := &model.Partition{PartitionID: partitionID, CollectionID: collectionID, State: pb.PartitionState_PartitionCreated} err := kc.AlterPartition(ctx, oldP, newP, metastore.MODIFY, 0) assert.NoError(t, err) - key := buildPartitionKey(collectionID, partitionID) + key := BuildPartitionKey(collectionID, partitionID) value, ok := kvs[key] assert.True(t, ok) var partPb pb.PartitionInfo diff --git a/internal/metastore/kv/rootcoord/rootcoord_constant.go b/internal/metastore/kv/rootcoord/rootcoord_constant.go index d01f86c02a..2741eeef64 100644 --- a/internal/metastore/kv/rootcoord/rootcoord_constant.go +++ b/internal/metastore/kv/rootcoord/rootcoord_constant.go @@ -11,8 +11,11 @@ const ( AliasMetaPrefix = ComponentPrefix + "/aliases" FieldMetaPrefix = ComponentPrefix + "/fields" - // CollectionAliasMetaPrefix prefix for collection alias meta - CollectionAliasMetaPrefix = ComponentPrefix + "/collection-alias" + // CollectionAliasMetaPrefix210 prefix for collection alias meta + CollectionAliasMetaPrefix210 = ComponentPrefix + "/collection-alias" + + SnapshotsSep = "_ts" + SnapshotPrefix = "snapshots" // CommonCredentialPrefix subpath for common credential /* #nosec G101 */ diff --git a/internal/metastore/kv/rootcoord/suffix_snapshot.go b/internal/metastore/kv/rootcoord/suffix_snapshot.go index 2273c9f757..e458026f74 100644 --- a/internal/metastore/kv/rootcoord/suffix_snapshot.go +++ b/internal/metastore/kv/rootcoord/suffix_snapshot.go @@ -27,6 +27,8 @@ import ( "strings" "sync" + "github.com/milvus-io/milvus/internal/common" + "github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/retry" @@ -39,6 +41,16 @@ var ( SuffixSnapshotTombstone = []byte{0xE2, 0x9B, 0xBC} ) +// IsTombstone used in migration tool also. +func IsTombstone(value string) bool { + return bytes.Equal([]byte(value), SuffixSnapshotTombstone) +} + +// ConstructTombstone used in migration tool also. +func ConstructTombstone() []byte { + return common.CloneByteSlice(SuffixSnapshotTombstone) +} + // SuffixSnapshot implements SnapshotKV // this is a simple replacement for MetaSnapshot, which is not available due to etcd compaction // SuffixSnapshot record timestamp as prefix of a key under the Snapshot prefix path @@ -106,7 +118,7 @@ func NewSuffixSnapshot(txnKV kv.TxnKV, sep, root, snapshot string) (*SuffixSnaps // isTombstone helper function to check whether is tombstone mark func (ss *SuffixSnapshot) isTombstone(value string) bool { - return bytes.Equal([]byte(value), SuffixSnapshotTombstone) + return IsTombstone(value) } // hideRootPrefix helper function to hide root prefix from key @@ -120,11 +132,17 @@ func (ss *SuffixSnapshot) composeSnapshotPrefix(key string) string { return path.Join(ss.snapshotPrefix, key+ss.separator) } +// ComposeSnapshotKey used in migration tool also, in case of any rules change. +func ComposeSnapshotKey(snapshotPrefix string, key string, separator string, ts typeutil.Timestamp) string { + // [key][sep][ts] + return path.Join(snapshotPrefix, fmt.Sprintf("%s%s%d", key, separator, ts)) +} + // composeTSKey unified tsKey composing method // uses key, ts and separator to form a key func (ss *SuffixSnapshot) composeTSKey(key string, ts typeutil.Timestamp) string { // [key][sep][ts] - return path.Join(ss.snapshotPrefix, fmt.Sprintf("%s%s%d", key, ss.separator, ts)) + return ComposeSnapshotKey(ss.snapshotPrefix, key, ss.separator, ts) } // isTSKey checks whether a key is in ts-key format diff --git a/internal/metastore/model/collection.go b/internal/metastore/model/collection.go index 8f2b55c318..05f80e4cd3 100644 --- a/internal/metastore/model/collection.go +++ b/internal/metastore/model/collection.go @@ -77,14 +77,6 @@ func UnmarshalCollectionModel(coll *pb.CollectionInfo) *Collection { } } - filedIDToIndexIDs := make([]common.Int64Tuple, len(coll.FieldIndexes)) - for idx, fieldIndexInfo := range coll.FieldIndexes { - filedIDToIndexIDs[idx] = common.Int64Tuple{ - Key: fieldIndexInfo.FiledID, - Value: fieldIndexInfo.IndexID, - } - } - return &Collection{ CollectionID: coll.ID, Name: coll.Schema.Name, @@ -106,6 +98,33 @@ func UnmarshalCollectionModel(coll *pb.CollectionInfo) *Collection { // MarshalCollectionModel marshal only collection-related information. // partitions, aliases and fields won't be marshaled. They should be written to newly path. func MarshalCollectionModel(coll *Collection) *pb.CollectionInfo { + return marshalCollectionModelWithConfig(coll, newDefaultConfig()) +} + +type config struct { + withFields bool + withPartitions bool +} + +type Option func(c *config) + +func newDefaultConfig() *config { + return &config{withFields: false, withPartitions: false} +} + +func WithFields() Option { + return func(c *config) { + c.withFields = true + } +} + +func WithPartitions() Option { + return func(c *config) { + c.withPartitions = true + } +} + +func marshalCollectionModelWithConfig(coll *Collection, c *config) *pb.CollectionInfo { if coll == nil { return nil } @@ -116,16 +135,12 @@ func MarshalCollectionModel(coll *Collection) *pb.CollectionInfo { AutoID: coll.AutoID, } - partitions := make([]*pb.PartitionInfo, len(coll.Partitions)) - for idx, partition := range coll.Partitions { - partitions[idx] = &pb.PartitionInfo{ - PartitionID: partition.PartitionID, - PartitionName: partition.PartitionName, - PartitionCreatedTimestamp: partition.PartitionCreatedTimestamp, - } + if c.withFields { + fields := MarshalFieldModels(coll.Fields) + collSchema.Fields = fields } - return &pb.CollectionInfo{ + collectionPb := &pb.CollectionInfo{ ID: coll.CollectionID, Schema: collSchema, CreateTime: coll.CreateTime, @@ -137,4 +152,22 @@ func MarshalCollectionModel(coll *Collection) *pb.CollectionInfo { State: coll.State, Properties: coll.Properties, } + + if c.withPartitions { + for _, partition := range coll.Partitions { + collectionPb.PartitionNames = append(collectionPb.PartitionNames, partition.PartitionName) + collectionPb.PartitionIDs = append(collectionPb.PartitionIDs, partition.PartitionID) + collectionPb.PartitionCreatedTimestamps = append(collectionPb.PartitionCreatedTimestamps, partition.PartitionCreatedTimestamp) + } + } + + return collectionPb +} + +func MarshalCollectionModelWithOption(coll *Collection, opts ...Option) *pb.CollectionInfo { + c := newDefaultConfig() + for _, opt := range opts { + opt(c) + } + return marshalCollectionModelWithConfig(coll, c) } diff --git a/internal/rootcoord/constrant.go b/internal/rootcoord/constrant.go index bf09dacbcb..2fa6309797 100644 --- a/internal/rootcoord/constrant.go +++ b/internal/rootcoord/constrant.go @@ -2,8 +2,6 @@ package rootcoord const ( // TODO: better to make them configurable, use default value if no config was set since we never explode these before. - snapshotsSep = "_ts" - snapshotPrefix = "snapshots" globalIDAllocatorKey = "idTimestamp" globalIDAllocatorSubPath = "gid" globalTSOAllocatorKey = "timestamp" diff --git a/internal/rootcoord/meta_table.go b/internal/rootcoord/meta_table.go index b88671f301..64f274047a 100644 --- a/internal/rootcoord/meta_table.go +++ b/internal/rootcoord/meta_table.go @@ -43,12 +43,6 @@ const ( // TimestampPrefix prefix for timestamp TimestampPrefix = rootcoord.ComponentPrefix + "/timestamp" - // DDOperationPrefix prefix for DD operation - DDOperationPrefix = rootcoord.ComponentPrefix + "/dd-operation" - - // DDMsgSendPrefix prefix to indicate whether DD msg has been send - DDMsgSendPrefix = rootcoord.ComponentPrefix + "/dd-msg-send" - // CreateCollectionDDType name of DD type for create collection CreateCollectionDDType = "CreateCollection" diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 5741409655..409c1d96f6 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -350,7 +350,7 @@ func (c *Core) initMetaTable() error { return err } - if ss, err = kvmetestore.NewSuffixSnapshot(metaKV, snapshotsSep, Params.EtcdCfg.MetaRootPath, snapshotPrefix); err != nil { + if ss, err = kvmetestore.NewSuffixSnapshot(metaKV, kvmetestore.SnapshotsSep, Params.EtcdCfg.MetaRootPath, kvmetestore.SnapshotPrefix); err != nil { return err } diff --git a/internal/util/paramtable/base_table.go b/internal/util/paramtable/base_table.go index e297f4b351..b24357ab40 100644 --- a/internal/util/paramtable/base_table.go +++ b/internal/util/paramtable/base_table.go @@ -82,6 +82,14 @@ type BaseTable struct { YamlFile string } +// NewBaseTableFromYamlOnly only used in migration tool. +// Maybe we shouldn't limit the configDir internally. +func NewBaseTableFromYamlOnly(yaml string) *BaseTable { + mgr, _ := config.Init(config.WithFilesSource(yaml)) + gp := &BaseTable{mgr: mgr, YamlFile: yaml} + return gp +} + // GlobalInitWithYaml initializes the param table with the given yaml. // We will update the global DefaultYaml variable directly, once and for all. // GlobalInitWithYaml shall be called at the very beginning before initiating the base table. diff --git a/internal/util/paramtable/base_table_test.go b/internal/util/paramtable/base_table_test.go index db2a3b06b2..7b2adcedfc 100644 --- a/internal/util/paramtable/base_table_test.go +++ b/internal/util/paramtable/base_table_test.go @@ -283,3 +283,11 @@ func Test_SetLogger(t *testing.T) { assert.Equal(t, true, grpclog.V(2)) }) } + +func TestNewBaseTableFromYamlOnly(t *testing.T) { + var yaml string + var gp *BaseTable + yaml = "not_exist.yaml" + gp = NewBaseTableFromYamlOnly(yaml) + assert.Empty(t, gp.Get("key")) +} diff --git a/internal/util/paramtable/service_param.go b/internal/util/paramtable/service_param.go index 9034034460..62c4982dc0 100644 --- a/internal/util/paramtable/service_param.go +++ b/internal/util/paramtable/service_param.go @@ -219,6 +219,10 @@ type MetaStoreConfig struct { func (p *MetaStoreConfig) init(base *BaseTable) { p.Base = base + p.LoadCfgToMemory() +} + +func (p *MetaStoreConfig) LoadCfgToMemory() { p.initMetaStoreType() } diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index c6dbf47548..0182e8ff2c 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -33,6 +33,17 @@ var GlobalParams paramtable.ComponentParam // SessionEventType session event type type SessionEventType int +func (t SessionEventType) String() string { + switch t { + case SessionAddEvent: + return "SessionAddEvent" + case SessionDelEvent: + return "SessionDelEvent" + default: + return "" + } +} + // Rewatch defines the behavior outer session watch handles ErrCompacted // it should process the current full list of session // and returns err if meta error or anything else goes wrong diff --git a/internal/util/sessionutil/session_util_test.go b/internal/util/sessionutil/session_util_test.go index 282c380a17..7ef0ec4400 100644 --- a/internal/util/sessionutil/session_util_test.go +++ b/internal/util/sessionutil/session_util_test.go @@ -685,3 +685,20 @@ func TestSessionProcessActiveStandBy(t *testing.T) { wg.Wait() assert.False(t, s2.isStandby.Load().(bool)) } + +func TestSessionEventType_String(t *testing.T) { + tests := []struct { + name string + t SessionEventType + want string + }{ + {t: SessionNoneEvent, want: ""}, + {t: SessionAddEvent, want: "SessionAddEvent"}, + {t: SessionDelEvent, want: "SessionDelEvent"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equalf(t, tt.want, tt.t.String(), "String()") + }) + } +} diff --git a/scripts/proto_gen_go.sh b/scripts/proto_gen_go.sh index e9d3325c5b..508464c3b1 100755 --- a/scripts/proto_gen_go.sh +++ b/scripts/proto_gen_go.sh @@ -52,6 +52,8 @@ mkdir -p datapb mkdir -p querypb mkdir -p planpb +mkdir -p ../../cmd/tools/migration/legacy/legacypb + ${protoc} --proto_path="${GOOGLE_PROTO_DIR}" --proto_path=. \ --go_opt="Mmilvus.proto=github.com/milvus-io/milvus/api/milvuspb;milvuspb" \ --go_opt=Mcommon.proto=github.com/milvus-io/milvus/api/commonpb \ @@ -86,4 +88,7 @@ ${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./querypb query_coord. ${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./planpb plan.proto ${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./segcorepb segcore.proto +${protoc_opt} --proto_path=../../cmd/tools/migration/legacy/ \ + --go_out=plugins=grpc,paths=source_relative:../../cmd/tools/migration/legacy/legacypb legacy.proto + popd