Add meta migration tool (#19709)

Signed-off-by: longjiquan <jiquan.long@zilliz.com>

Signed-off-by: longjiquan <jiquan.long@zilliz.com>
pull/19725/head
Jiquan Long 2022-10-12 11:37:23 +08:00 committed by GitHub
parent a561614d48
commit a8a074162f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
53 changed files with 3008 additions and 86 deletions

View File

@ -79,6 +79,12 @@ binlog:
@source $(PWD)/scripts/setenv.sh && \
mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/binlog $(PWD)/cmd/tools/binlog/main.go 1>/dev/null
MIGRATION_PATH = $(PWD)/cmd/tools/migration
migration:
@echo "Building migration tool ..."
@source $(PWD)/scripts/setenv.sh && \
mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/migration $(MIGRATION_PATH)/main.go 1>/dev/null
BUILD_TAGS = $(shell git describe --tags --always --dirty="-dev")
BUILD_TIME = $(shell date -u)
GIT_COMMIT = $(shell git rev-parse --short HEAD)

View File

@ -0,0 +1,39 @@
package backend
import (
"fmt"
"github.com/blang/semver/v4"
"github.com/milvus-io/milvus/cmd/tools/migration/configs"
"github.com/milvus-io/milvus/cmd/tools/migration/versions"
"github.com/milvus-io/milvus/cmd/tools/migration/meta"
"github.com/milvus-io/milvus/internal/util"
)
type Backend interface {
Load() (*meta.Meta, error)
Save(meta *meta.Meta) error
Clean() error
Backup(meta *meta.Meta, backupFile string) error
Restore(backupFile string) error
}
func NewBackend(cfg *configs.MilvusConfig, version string) (Backend, error) {
switch cfg.MetaStoreCfg.MetaStoreType {
case util.MetaStoreTypeMysql:
return nil, fmt.Errorf("%s is not supported now", cfg.MetaStoreCfg.MetaStoreType)
}
v, err := semver.Parse(version)
if err != nil {
return nil, err
}
if versions.Range21x(v) {
return newEtcd210(cfg)
} else if versions.Range22x(v) {
return newEtcd220(cfg)
}
return nil, fmt.Errorf("version not supported: %s", version)
}

View File

@ -0,0 +1,35 @@
package backend
import "github.com/golang/protobuf/proto"
type BackupHeaderVersion int32
const (
BackupHeaderVersionV1 BackupHeaderVersion = iota
)
// BackupHeader stores etcd backup header information
type BackupHeader struct {
// Version number for backup format
Version BackupHeaderVersion `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"`
// instance name, as rootPath for key prefix
Instance string `protobuf:"bytes,2,opt,name=instance,proto3" json:"instance,omitempty"`
// MetaPath used in keys
MetaPath string `protobuf:"bytes,3,opt,name=meta_path,proto3" json:"meta_path,omitempty"`
// Entries record number of key-value in backup
Entries int64 `protobuf:"varint,4,opt,name=entries,proto3" json:"entries,omitempty"`
// Component is the backup target
Component string `protobuf:"bytes,5,opt,name=component,proto3" json:"component,omitempty"`
// Extra property reserved
Extra []byte `protobuf:"bytes,6,opt,name=extra,proto3" json:"-"`
}
func (v *BackupHeader) Reset() {
*v = BackupHeader{}
}
func (v *BackupHeader) String() string {
return proto.CompactTextString(v)
}
func (v *BackupHeader) ProtoMessage() {}

View File

@ -0,0 +1,143 @@
package backend
import (
"encoding/binary"
"fmt"
"io"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/api/commonpb"
)
type BackupFile []byte
func (f *BackupFile) Reset() {
*f = nil
}
func (f *BackupFile) writeBytes(bs []byte) {
*f = append(*f, bs...)
}
func (f *BackupFile) writeHeaderLength(l uint64) {
lengthBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(lengthBytes, l)
f.writeBytes(lengthBytes)
}
func (f *BackupFile) writeHeaderBytes(header []byte) {
f.writeBytes(header)
}
func (f *BackupFile) WriteHeader(header *BackupHeader) error {
f.Reset()
marshaledHeader, err := proto.Marshal(header)
if err != nil {
return err
}
l := len(marshaledHeader)
f.writeHeaderLength(uint64(l))
f.writeHeaderBytes(marshaledHeader)
return nil
}
func (f *BackupFile) writeEntryLength(l uint64) {
f.writeHeaderLength(l)
}
func (f *BackupFile) writeEntryBytes(entry []byte) {
f.writeBytes(entry)
}
func (f *BackupFile) WriteEntry(k, v string) error {
entry := &commonpb.KeyDataPair{
Key: k,
Data: []byte(v),
}
marshaledEntry, err := proto.Marshal(entry)
if err != nil {
return err
}
l := len(marshaledEntry)
f.writeEntryLength(uint64(l))
f.writeEntryBytes(marshaledEntry)
return nil
}
func (f *BackupFile) ReadHeader() (header *BackupHeader, headerLength uint64, err error) {
if len(*f) < 8 {
return nil, 0, fmt.Errorf("invalid backup file, cannot read header length")
}
headerLength = binary.LittleEndian.Uint64((*f)[:8])
if uint64(len(*f)) < 8+headerLength {
return nil, 0, fmt.Errorf("invalid backup file, cannot read header")
}
header = &BackupHeader{}
if err := proto.Unmarshal((*f)[8:headerLength+8], header); err != nil {
return nil, 0, fmt.Errorf("invalid backup file, cannot read header: %s", err.Error())
}
return header, headerLength, nil
}
func (f *BackupFile) ReadEntryFromPos(pos uint64) (entryLength uint64, entry *commonpb.KeyDataPair, err error) {
if pos == uint64(len(*f)) {
return 0, nil, io.EOF
}
if uint64(len(*f)) < pos+8 {
return 0, nil, fmt.Errorf("invalid backup file, cannot read entry length")
}
entryLength = binary.LittleEndian.Uint64((*f)[pos : pos+8])
if uint64(len(*f)) < pos+8+entryLength {
return 0, nil, fmt.Errorf("invalid backup file, cannot read entry")
}
entry = &commonpb.KeyDataPair{}
if err := proto.Unmarshal((*f)[pos+8:pos+8+entryLength], entry); err != nil {
return 0, nil, fmt.Errorf("invalid backup file, cannot read entry: %s", err.Error())
}
return entryLength, entry, nil
}
func (f *BackupFile) DeSerialize() (header *BackupHeader, kvs map[string]string, err error) {
pos := uint64(0)
header, headerLength, err := f.ReadHeader()
if err != nil {
return nil, nil, err
}
pos += 8 + headerLength
kvs = make(map[string]string)
for {
entryLength, entry, err := f.ReadEntryFromPos(pos)
if err == io.EOF {
return header, kvs, nil
}
if err != nil {
return nil, nil, err
}
kvs[entry.GetKey()] = string(entry.GetData())
pos += 8 + entryLength
}
}
type BackupCodec struct{}
func (c *BackupCodec) Serialize(header *BackupHeader, kvs map[string]string) (BackupFile, error) {
file := make(BackupFile, 0)
header.Entries = int64(len(kvs))
if err := file.WriteHeader(header); err != nil {
return nil, err
}
for k, v := range kvs {
if err := file.WriteEntry(k, v); err != nil {
return nil, err
}
}
return file, nil
}
func (c *BackupCodec) DeSerialize(file BackupFile) (header *BackupHeader, kvs map[string]string, err error) {
return file.DeSerialize()
}
func NewBackupCodec() *BackupCodec {
return &BackupCodec{}
}

View File

@ -0,0 +1,31 @@
package backend
import (
"reflect"
"testing"
"github.com/stretchr/testify/assert"
)
func TestBackupCodec_Serialize(t *testing.T) {
header := &BackupHeader{
Version: BackupHeaderVersionV1,
Instance: "/by-dev",
MetaPath: "meta",
Entries: 0,
Component: "",
Extra: nil,
}
kvs := map[string]string{
"1": "1",
"2": "2",
"3": "3",
}
codec := &BackupCodec{}
file, err := codec.Serialize(header, kvs)
assert.NoError(t, err)
gotHeader, gotEntries, err := codec.DeSerialize(file)
assert.NoError(t, err)
assert.True(t, reflect.DeepEqual(header, gotHeader))
assert.True(t, reflect.DeepEqual(kvs, gotEntries))
}

View File

@ -0,0 +1,29 @@
package backend
import (
"github.com/milvus-io/milvus/cmd/tools/migration/configs"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/util/etcd"
clientv3 "go.etcd.io/etcd/client/v3"
)
type etcdBasedBackend struct {
cfg *configs.MilvusConfig
txn kv.MetaKv
etcdCli *clientv3.Client
}
func (b etcdBasedBackend) CleanWithPrefix(prefix string) error {
return b.txn.RemoveWithPrefix(prefix)
}
func newEtcdBasedBackend(cfg *configs.MilvusConfig) (*etcdBasedBackend, error) {
etcdCli, err := etcd.GetEtcdClient(cfg.EtcdCfg)
if err != nil {
return nil, err
}
txn := etcdkv.NewEtcdKV(etcdCli, cfg.EtcdCfg.MetaRootPath)
b := &etcdBasedBackend{cfg: cfg, etcdCli: etcdCli, txn: txn}
return b, nil
}

View File

@ -0,0 +1,417 @@
package backend
import (
"fmt"
"io/ioutil"
"path"
"strconv"
"strings"
"github.com/milvus-io/milvus/cmd/tools/migration/configs"
"github.com/milvus-io/milvus/cmd/tools/migration/legacy"
"github.com/milvus-io/milvus/cmd/tools/migration/legacy/legacypb"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/cmd/tools/migration/console"
"github.com/milvus-io/milvus/cmd/tools/migration/meta"
"github.com/milvus-io/milvus/cmd/tools/migration/utils"
"github.com/milvus-io/milvus/cmd/tools/migration/versions"
"github.com/milvus-io/milvus/internal/metastore/kv/rootcoord"
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
// etcd210 implements Backend.
type etcd210 struct {
Backend
*etcdBasedBackend
}
func newEtcd210(cfg *configs.MilvusConfig) (*etcd210, error) {
etcdBackend, err := newEtcdBasedBackend(cfg)
if err != nil {
return nil, err
}
return &etcd210{etcdBasedBackend: etcdBackend}, nil
}
func (b etcd210) loadTtAliases() (meta.TtAliasesMeta210, error) {
ttAliases := make(meta.TtAliasesMeta210)
prefix := path.Join(rootcoord.SnapshotPrefix, rootcoord.CollectionAliasMetaPrefix210)
keys, values, err := b.txn.LoadWithPrefix(prefix)
if err != nil {
return nil, err
}
if len(keys) != len(values) {
return nil, fmt.Errorf("length mismatch")
}
l := len(keys)
for i := 0; i < l; i++ {
tsKey := keys[i]
tsValue := values[i]
valueIsTombstone := rootcoord.IsTombstone(tsValue)
var aliasInfo = &pb.CollectionInfo{} // alias stored in collection info.
if valueIsTombstone {
aliasInfo = nil
} else {
if err := proto.Unmarshal([]byte(tsValue), aliasInfo); err != nil {
return nil, err
}
}
key, ts, err := utils.SplitBySeparator(tsKey)
if err != nil {
return nil, err
}
ttAliases.AddAlias(utils.GetFileName(key), aliasInfo, ts)
}
return ttAliases, nil
}
func (b etcd210) loadAliases() (meta.AliasesMeta210, error) {
aliases := make(meta.AliasesMeta210)
prefix := rootcoord.CollectionAliasMetaPrefix210
keys, values, err := b.txn.LoadWithPrefix(prefix)
if err != nil {
return nil, err
}
if len(keys) != len(values) {
return nil, fmt.Errorf("length mismatch")
}
l := len(keys)
for i := 0; i < l; i++ {
key := keys[i]
value := values[i]
valueIsTombstone := rootcoord.IsTombstone(value)
var aliasInfo = &pb.CollectionInfo{} // alias stored in collection info.
if valueIsTombstone {
aliasInfo = nil
} else {
if err := proto.Unmarshal([]byte(value), aliasInfo); err != nil {
return nil, err
}
}
aliases.AddAlias(utils.GetFileName(key), aliasInfo)
}
return aliases, nil
}
func (b etcd210) loadTtCollections() (meta.TtCollectionsMeta210, error) {
ttCollections := make(meta.TtCollectionsMeta210)
prefix := path.Join(rootcoord.SnapshotPrefix, rootcoord.CollectionMetaPrefix)
keys, values, err := b.txn.LoadWithPrefix(prefix)
if err != nil {
return nil, err
}
if len(keys) != len(values) {
return nil, fmt.Errorf("length mismatch")
}
l := len(keys)
for i := 0; i < l; i++ {
tsKey := keys[i]
tsValue := values[i]
// ugly here, since alias and collections have same prefix.
if strings.Contains(tsKey, rootcoord.CollectionAliasMetaPrefix210) {
continue
}
valueIsTombstone := rootcoord.IsTombstone(tsValue)
var coll = &pb.CollectionInfo{}
if valueIsTombstone {
coll = nil
} else {
if err := proto.Unmarshal([]byte(tsValue), coll); err != nil {
return nil, err
}
}
key, ts, err := utils.SplitBySeparator(tsKey)
if err != nil {
return nil, err
}
collectionID, err := strconv.Atoi(utils.GetFileName(key))
if err != nil {
return nil, err
}
ttCollections.AddCollection(typeutil.UniqueID(collectionID), coll, ts)
}
return ttCollections, nil
}
func (b etcd210) loadCollections() (meta.CollectionsMeta210, error) {
collections := make(meta.CollectionsMeta210)
prefix := rootcoord.CollectionMetaPrefix
keys, values, err := b.txn.LoadWithPrefix(prefix)
if err != nil {
return nil, err
}
if len(keys) != len(values) {
return nil, fmt.Errorf("length mismatch")
}
l := len(keys)
for i := 0; i < l; i++ {
key := keys[i]
value := values[i]
// ugly here, since alias and collections have same prefix.
if strings.Contains(key, rootcoord.CollectionAliasMetaPrefix210) {
continue
}
valueIsTombstone := rootcoord.IsTombstone(value)
var coll = &pb.CollectionInfo{}
if valueIsTombstone {
coll = nil
} else {
if err := proto.Unmarshal([]byte(value), coll); err != nil {
return nil, err
}
}
collectionID, err := strconv.Atoi(utils.GetFileName(key))
if err != nil {
return nil, err
}
collections.AddCollection(typeutil.UniqueID(collectionID), coll)
}
return collections, nil
}
func parseCollectionIndexKey(key string) (collectionID, indexID typeutil.UniqueID, err error) {
ss := strings.Split(key, "/")
l := len(ss)
if l < 2 {
return 0, 0, fmt.Errorf("failed to parse collection index key: %s", key)
}
index, err := strconv.Atoi(ss[l-1])
if err != nil {
return 0, 0, err
}
collection, err := strconv.Atoi(ss[l-2])
if err != nil {
return 0, 0, err
}
return typeutil.UniqueID(collection), typeutil.UniqueID(index), nil
}
func (b etcd210) loadCollectionIndexes() (meta.CollectionIndexesMeta210, error) {
collectionIndexes := make(meta.CollectionIndexesMeta210)
prefix := legacy.IndexMetaBefore220Prefix
keys, values, err := b.txn.LoadWithPrefix(prefix)
if err != nil {
return nil, err
}
if len(keys) != len(values) {
return nil, fmt.Errorf("length mismatch")
}
l := len(keys)
for i := 0; i < l; i++ {
key := keys[i]
value := values[i]
var index = &pb.IndexInfo{}
if err := proto.Unmarshal([]byte(value), index); err != nil {
return nil, err
}
collectionID, indexID, err := parseCollectionIndexKey(key)
if err != nil {
return nil, err
}
collectionIndexes.AddIndex(collectionID, indexID, index)
}
return collectionIndexes, nil
}
func (b etcd210) loadSegmentIndexes() (meta.SegmentIndexesMeta210, error) {
segmentIndexes := make(meta.SegmentIndexesMeta210)
prefix := legacy.SegmentIndexPrefixBefore220
keys, values, err := b.txn.LoadWithPrefix(prefix)
if err != nil {
return nil, err
}
if len(keys) != len(values) {
return nil, fmt.Errorf("length mismatch")
}
l := len(keys)
for i := 0; i < l; i++ {
value := values[i]
var index = &pb.SegmentIndexInfo{}
if err := proto.Unmarshal([]byte(value), index); err != nil {
return nil, err
}
segmentIndexes.AddIndex(index.GetSegmentID(), index.GetIndexID(), index)
}
return segmentIndexes, nil
}
func (b etcd210) loadIndexBuildMeta() (meta.IndexBuildMeta210, error) {
indexBuildMeta := make(meta.IndexBuildMeta210)
prefix := legacy.IndexBuildPrefixBefore220
keys, values, err := b.txn.LoadWithPrefix(prefix)
if err != nil {
return nil, err
}
if len(keys) != len(values) {
return nil, fmt.Errorf("length mismatch")
}
l := len(keys)
for i := 0; i < l; i++ {
value := values[i]
var record = &legacypb.IndexMeta{}
if err := proto.Unmarshal([]byte(value), record); err != nil {
return nil, err
}
indexBuildMeta.AddRecord(record.GetIndexBuildID(), record)
}
return indexBuildMeta, nil
}
func (b etcd210) loadLastDDLRecords() (meta.LastDDLRecords, error) {
records := make(meta.LastDDLRecords)
prefixes := []string{
legacy.DDOperationPrefixBefore220,
legacy.DDMsgSendPrefixBefore220,
path.Join(rootcoord.SnapshotPrefix, legacy.DDOperationPrefixBefore220),
path.Join(rootcoord.SnapshotPrefix, legacy.DDMsgSendPrefixBefore220),
}
for _, prefix := range prefixes {
keys, values, err := b.txn.LoadWithPrefix(prefix)
if err != nil {
return nil, err
}
if len(keys) != len(values) {
return nil, fmt.Errorf("length mismatch")
}
for i, k := range keys {
records.AddRecord(k, values[i])
}
}
return records, nil
}
func (b etcd210) Load() (*meta.Meta, error) {
ttCollections, err := b.loadTtCollections()
if err != nil {
return nil, err
}
collections, err := b.loadCollections()
if err != nil {
return nil, err
}
ttAliases, err := b.loadTtAliases()
if err != nil {
return nil, err
}
aliases, err := b.loadAliases()
if err != nil {
return nil, err
}
collectionIndexes, err := b.loadCollectionIndexes()
if err != nil {
return nil, err
}
segmentIndexes, err := b.loadSegmentIndexes()
if err != nil {
return nil, err
}
indexBuildMeta, err := b.loadIndexBuildMeta()
if err != nil {
return nil, err
}
lastDdlRecords, err := b.loadLastDDLRecords()
if err != nil {
return nil, err
}
return &meta.Meta{
Version: versions.Version210,
Meta210: &meta.All210{
TtCollections: ttCollections,
Collections: collections,
TtAliases: ttAliases,
Aliases: aliases,
CollectionIndexes: collectionIndexes,
SegmentIndexes: segmentIndexes,
IndexBuildMeta: indexBuildMeta,
LastDDLRecords: lastDdlRecords,
},
}, nil
}
func lineCleanPrefix(prefix string) {
fmt.Printf("prefix %s will be removed!\n", prefix)
}
func (b etcd210) Clean() error {
prefixes := []string{
rootcoord.CollectionMetaPrefix,
path.Join(rootcoord.SnapshotPrefix, rootcoord.CollectionMetaPrefix),
rootcoord.CollectionAliasMetaPrefix210,
path.Join(rootcoord.SnapshotPrefix, rootcoord.CollectionAliasMetaPrefix210),
legacy.SegmentIndexPrefixBefore220,
legacy.IndexMetaBefore220Prefix,
legacy.IndexBuildPrefixBefore220,
legacy.DDMsgSendPrefixBefore220,
path.Join(rootcoord.SnapshotPrefix, legacy.DDMsgSendPrefixBefore220),
legacy.DDOperationPrefixBefore220,
path.Join(rootcoord.SnapshotPrefix, legacy.DDOperationPrefixBefore220),
}
for _, prefix := range prefixes {
if err := b.CleanWithPrefix(prefix); err != nil {
return err
}
lineCleanPrefix(prefix)
}
return nil
}
func (b etcd210) Backup(meta *meta.Meta, backupFile string) error {
saves := meta.Meta210.GenerateSaves()
codec := NewBackupCodec()
var instance, metaPath string
metaRootPath := b.cfg.EtcdCfg.MetaRootPath
parts := strings.Split(metaRootPath, "/")
if len(parts) > 1 {
metaPath = parts[len(parts)-1]
instance = path.Join(parts[:len(parts)-1]...)
} else {
instance = metaRootPath
}
header := &BackupHeader{
Version: BackupHeaderVersionV1,
Instance: instance,
MetaPath: metaPath,
Entries: int64(len(saves)),
Component: "",
Extra: nil,
}
backup, err := codec.Serialize(header, saves)
if err != nil {
return err
}
console.Warning(fmt.Sprintf("backup to: %s", backupFile))
return ioutil.WriteFile(backupFile, backup, 0600)
}
func (b etcd210) Restore(backupFile string) error {
backup, err := ioutil.ReadFile(backupFile)
if err != nil {
return err
}
codec := NewBackupCodec()
_, saves, err := codec.DeSerialize(backup)
if err != nil {
return err
}
for k, v := range saves {
if err := b.txn.Save(k, v); err != nil {
return err
}
}
return nil
}

View File

@ -0,0 +1,123 @@
package backend
import (
"fmt"
"github.com/milvus-io/milvus/cmd/tools/migration/configs"
"github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/cmd/tools/migration/meta"
"github.com/milvus-io/milvus/internal/metastore/kv/rootcoord"
)
// etcd220 implements Backend.
type etcd220 struct {
Backend
*etcdBasedBackend
}
func newEtcd220(cfg *configs.MilvusConfig) (*etcd220, error) {
etcdBackend, err := newEtcdBasedBackend(cfg)
if err != nil {
return nil, err
}
return &etcd220{etcdBasedBackend: etcdBackend}, nil
}
func lineSaveTo(key string) {
fmt.Printf("save to %s\n", key)
}
func printSaves(saves map[string]string) {
for k := range saves {
lineSaveTo(k)
}
}
func (b etcd220) save(saves map[string]string) error {
for k, v := range saves {
if err := b.txn.Save(k, v); err != nil {
return err
}
}
return nil
}
func (b etcd220) Save(metas *meta.Meta) error {
{
saves, err := metas.Meta220.TtCollections.GenerateSaves(metas.SourceVersion)
if err != nil {
return err
}
if err := b.save(saves); err != nil {
return err
}
}
{
saves, err := metas.Meta220.Collections.GenerateSaves(metas.SourceVersion)
if err != nil {
return err
}
if err := b.save(saves); err != nil {
return err
}
}
{
saves, err := metas.Meta220.TtAliases.GenerateSaves()
if err != nil {
return err
}
if err := b.save(saves); err != nil {
return err
}
}
{
saves, err := metas.Meta220.Aliases.GenerateSaves()
if err != nil {
return err
}
if err := b.save(saves); err != nil {
return err
}
}
{
saves, err := metas.Meta220.CollectionIndexes.GenerateSaves()
if err != nil {
return err
}
if err := b.save(saves); err != nil {
return err
}
}
{
saves, err := metas.Meta220.SegmentIndexes.GenerateSaves()
if err != nil {
return err
}
if err := b.save(saves); err != nil {
return err
}
}
return nil
}
func (b etcd220) Clean() error {
prefixes := []string{
rootcoord.CollectionMetaPrefix,
rootcoord.PartitionMetaPrefix,
rootcoord.FieldMetaPrefix,
rootcoord.AliasMetaPrefix,
util.FieldIndexPrefix,
util.SegmentIndexPrefix,
}
for _, prefix := range prefixes {
if err := b.CleanWithPrefix(prefix); err != nil {
return err
}
lineCleanPrefix(prefix)
}
return nil
}

View File

@ -0,0 +1,20 @@
package command
import (
"context"
"github.com/milvus-io/milvus/cmd/tools/migration/configs"
"github.com/milvus-io/milvus/cmd/tools/migration/console"
"github.com/milvus-io/milvus/cmd/tools/migration/migration"
)
func Backup(c *configs.Config) {
ctx := context.Background()
runner := migration.NewRunner(ctx, c)
console.ExitIf(runner.CheckSessions())
console.ExitIf(runner.RegisterSession())
defer runner.Stop()
// double check.
console.ExitIf(runner.CheckSessions())
console.ExitIf(runner.Backup())
}

View File

@ -0,0 +1,24 @@
package command
import (
"flag"
"github.com/milvus-io/milvus/cmd/tools/migration/console"
)
type commandParser struct {
configYaml string
}
func (c *commandParser) formatYaml(args []string, flags *flag.FlagSet) {
flags.StringVar(&c.configYaml, "config", "", "set config yaml")
}
func (c *commandParser) parse(args []string, flags *flag.FlagSet) {
console.ExitIf(flags.Parse(args[1:]))
}
func (c *commandParser) format(args []string, flags *flag.FlagSet) {
c.formatYaml(args, flags)
c.parse(args, flags)
}

View File

@ -0,0 +1,12 @@
package command
import "fmt"
var (
usageLineV2 = fmt.Sprintf("Usage:\n"+
"%s\n", runLineV2)
runLineV2 = `
migration -config=config.yaml
`
)

View File

@ -0,0 +1,35 @@
package command
import (
"flag"
"fmt"
"os"
"github.com/milvus-io/milvus/cmd/tools/migration/console"
"github.com/milvus-io/milvus/cmd/tools/migration/configs"
)
func Execute(args []string) {
flags := flag.NewFlagSet(args[0], flag.ExitOnError)
flags.Usage = func() {
fmt.Fprintln(os.Stderr, usageLineV2)
}
c := &commandParser{}
c.format(args, flags)
console.ErrorExitIf(c.configYaml == "", "config not set")
cfg := configs.NewConfig(c.configYaml)
switch cfg.Cmd {
case configs.RunCmd:
Run(cfg)
case configs.BackupCmd:
Backup(cfg)
case configs.RollbackCmd:
Rollback(cfg)
default:
console.Exit(fmt.Sprintf("cmd not set or not supported: %s", cfg.Cmd))
}
}

View File

@ -0,0 +1,20 @@
package command
import (
"context"
"github.com/milvus-io/milvus/cmd/tools/migration/configs"
"github.com/milvus-io/milvus/cmd/tools/migration/console"
"github.com/milvus-io/milvus/cmd/tools/migration/migration"
)
func Rollback(c *configs.Config) {
ctx := context.Background()
runner := migration.NewRunner(ctx, c)
console.ExitIf(runner.CheckSessions())
console.ExitIf(runner.RegisterSession())
defer runner.Stop()
// double check.
console.ExitIf(runner.CheckSessions())
console.ExitIf(runner.Rollback())
}

View File

@ -0,0 +1,24 @@
package command
import (
"context"
"github.com/milvus-io/milvus/cmd/tools/migration/configs"
"github.com/milvus-io/milvus/cmd/tools/migration/console"
"github.com/milvus-io/milvus/cmd/tools/migration/migration"
)
func Run(c *configs.Config) {
ctx := context.Background()
runner := migration.NewRunner(ctx, c)
console.ExitIf(runner.CheckSessions())
console.ExitIf(runner.RegisterSession())
defer runner.Stop()
// double check.
console.ExitIf(runner.CheckSessions())
console.ExitIf(runner.Validate())
console.NormalExitIf(runner.CheckCompatible(), "version compatible, no need to migrate")
console.ExitIf(runner.Migrate())
}

View File

@ -0,0 +1,98 @@
package configs
import (
"fmt"
"github.com/milvus-io/milvus/cmd/tools/migration/console"
"github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
const (
RunCmd = "run"
BackupCmd = "backup"
RollbackCmd = "rollback"
)
type RunConfig struct {
base *paramtable.BaseTable
Cmd string
SourceVersion string
TargetVersion string
BackupFilePath string
}
func newRunConfig(base *paramtable.BaseTable) *RunConfig {
c := &RunConfig{}
c.init(base)
return c
}
func (c *RunConfig) String() string {
return fmt.Sprintf("Cmd: %s, SourceVersion: %s, TargetVersion: %s, BackupFilePath: %s",
c.Cmd, c.SourceVersion, c.TargetVersion, c.BackupFilePath)
}
func (c *RunConfig) show() {
console.Warning(c.String())
}
func (c *RunConfig) init(base *paramtable.BaseTable) {
c.base = base
c.Cmd = c.base.LoadWithDefault("cmd.type", "")
c.SourceVersion = c.base.LoadWithDefault("config.sourceVersion", "")
c.TargetVersion = c.base.LoadWithDefault("config.targetVersion", "")
c.BackupFilePath = c.base.LoadWithDefault("config.backupFilePath", "")
c.show()
}
type MilvusConfig struct {
MetaStoreCfg *paramtable.MetaStoreConfig
EtcdCfg *paramtable.EtcdConfig
MysqlCfg *paramtable.MetaDBConfig
}
func newMilvusConfig(base *paramtable.BaseTable) *MilvusConfig {
c := &MilvusConfig{}
c.init(base)
return c
}
func (c *MilvusConfig) init(base *paramtable.BaseTable) {
c.MetaStoreCfg = &paramtable.MetaStoreConfig{}
c.EtcdCfg = &paramtable.EtcdConfig{}
c.MysqlCfg = &paramtable.MetaDBConfig{}
c.MetaStoreCfg.Base = base
c.MetaStoreCfg.LoadCfgToMemory()
switch c.MetaStoreCfg.MetaStoreType {
case util.MetaStoreTypeMysql:
c.MysqlCfg.Base = base
c.MysqlCfg.LoadCfgToMemory()
default:
}
c.EtcdCfg.Base = base
c.EtcdCfg.LoadCfgToMemory()
}
type Config struct {
base *paramtable.BaseTable
*RunConfig
*MilvusConfig
}
func (c *Config) init(yamlFile string) {
c.base = paramtable.NewBaseTableFromYamlOnly(yamlFile)
c.RunConfig = newRunConfig(c.base)
c.MilvusConfig = newMilvusConfig(c.base)
}
func NewConfig(yamlFile string) *Config {
c := &Config{}
c.init(yamlFile)
return c
}

View File

@ -0,0 +1,57 @@
package console
import (
"fmt"
"os"
"github.com/mgutz/ansi"
)
func Success(msg string) {
colorOut(msg, "green")
}
func Error(msg string) {
colorOut(msg, "red")
}
func Warning(msg string) {
colorOut(msg, "yellow")
}
func Exit(msg string) {
Error(msg)
os.Exit(1)
}
func ExitIf(err error) {
if err != nil {
Exit(err.Error())
}
}
func NormalExit(msg string) {
Success(msg)
os.Exit(0)
}
func NormalExitIf(success bool, msg string) {
if success {
NormalExit(msg)
}
}
func ErrorExit(msg string) {
Warning(msg)
os.Exit(1)
}
func ErrorExitIf(fail bool, msg string) {
if fail {
ErrorExit(msg)
}
}
func colorOut(message, color string) {
fmt.Fprintln(os.Stdout, ansi.Color(message, color))
}

View File

@ -0,0 +1,21 @@
package console
import (
"testing"
)
func TestSuccess(t *testing.T) {
Success("success")
}
func TestError(t *testing.T) {
Error("error")
}
func TestWarning(t *testing.T) {
Warning("warning")
}
func TestExitIf(t *testing.T) {
ExitIf(nil)
}

View File

@ -0,0 +1,44 @@
cmd:
# Option: run/backup/rollback
type: run
config:
sourceVersion: 2.1.0
targetVersion: 2.2.0
backupFilePath: /tmp/migration.bak
metastore:
type: etcd
etcd:
endpoints:
- localhost:2379
rootPath: by-dev # The root path where data is stored in etcd
metaSubPath: meta # metaRootPath = rootPath + '/' + metaSubPath
kvSubPath: kv # kvRootPath = rootPath + '/' + kvSubPath
log:
# path is one of:
# - "default" as os.Stderr,
# - "stderr" as os.Stderr,
# - "stdout" as os.Stdout,
# - file path to append server logs to.
# please adjust in embedded Milvus: /tmp/milvus/logs/etcd.log
path: stdout
level: info # Only supports debug, info, warn, error, panic, or fatal. Default 'info'.
use:
# please adjust in embedded Milvus: true
embed: false # Whether to enable embedded Etcd (an in-process EtcdServer).
data:
# Embedded Etcd only.
# please adjust in embedded Milvus: /tmp/milvus/etcdData/
dir: default.etcd
ssl:
enabled: false # Whether to support ETCD secure connection mode
tlsCert: /path/to/etcd-client.pem # path to your cert file
tlsKey: /path/to/etcd-client-key.pem # path to your key file
tlsCACert: /path/to/ca.pem # path to your CACert file
# TLS min version
# Optional values: 1.0, 1.1, 1.2, 1.3。
# We recommend using version 1.2 and above
tlsMinVersion: 1.3

View File

@ -0,0 +1,11 @@
package legacy
import "github.com/milvus-io/milvus/internal/metastore/kv/rootcoord"
const (
DDOperationPrefixBefore220 = rootcoord.ComponentPrefix + "/dd-operation"
DDMsgSendPrefixBefore220 = rootcoord.ComponentPrefix + "/dd-msg-send"
IndexMetaBefore220Prefix = rootcoord.ComponentPrefix + "/index"
SegmentIndexPrefixBefore220 = rootcoord.ComponentPrefix + "/segment-index"
IndexBuildPrefixBefore220 = "indexes"
)

View File

@ -0,0 +1,35 @@
syntax = "proto3";
package milvus.proto.legacy;
option go_package = "github.com/milvus-io/milvus/internal/proto/legacypb";
import "common.proto";
import "schema.proto";
/********************************** Index Build Meta Before Version 220 ***************************************/
message BuildIndexRequest {
int64 indexBuildID = 1;
string index_name = 2;
int64 indexID = 3;
repeated string data_paths = 5;
repeated common.KeyValuePair type_params = 6;
repeated common.KeyValuePair index_params = 7;
int64 num_rows = 8;
schema.FieldSchema field_schema = 9;
int64 segmentID = 10;
}
message IndexMeta {
int64 indexBuildID = 1;
common.IndexState state = 2;
string fail_reason = 3;
BuildIndexRequest req = 4;
repeated string index_file_paths = 5;
bool mark_deleted = 6;
int64 nodeID = 7;
int64 index_version = 8;
bool recycled = 9;
uint64 serialize_size = 10;
}

View File

@ -0,0 +1,280 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: legacy.proto
package legacypb
import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
commonpb "github.com/milvus-io/milvus/api/commonpb"
schemapb "github.com/milvus-io/milvus/api/schemapb"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type BuildIndexRequest struct {
IndexBuildID int64 `protobuf:"varint,1,opt,name=indexBuildID,proto3" json:"indexBuildID,omitempty"`
IndexName string `protobuf:"bytes,2,opt,name=index_name,json=indexName,proto3" json:"index_name,omitempty"`
IndexID int64 `protobuf:"varint,3,opt,name=indexID,proto3" json:"indexID,omitempty"`
DataPaths []string `protobuf:"bytes,5,rep,name=data_paths,json=dataPaths,proto3" json:"data_paths,omitempty"`
TypeParams []*commonpb.KeyValuePair `protobuf:"bytes,6,rep,name=type_params,json=typeParams,proto3" json:"type_params,omitempty"`
IndexParams []*commonpb.KeyValuePair `protobuf:"bytes,7,rep,name=index_params,json=indexParams,proto3" json:"index_params,omitempty"`
NumRows int64 `protobuf:"varint,8,opt,name=num_rows,json=numRows,proto3" json:"num_rows,omitempty"`
FieldSchema *schemapb.FieldSchema `protobuf:"bytes,9,opt,name=field_schema,json=fieldSchema,proto3" json:"field_schema,omitempty"`
SegmentID int64 `protobuf:"varint,10,opt,name=segmentID,proto3" json:"segmentID,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *BuildIndexRequest) Reset() { *m = BuildIndexRequest{} }
func (m *BuildIndexRequest) String() string { return proto.CompactTextString(m) }
func (*BuildIndexRequest) ProtoMessage() {}
func (*BuildIndexRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_4b5c555c498591f0, []int{0}
}
func (m *BuildIndexRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_BuildIndexRequest.Unmarshal(m, b)
}
func (m *BuildIndexRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_BuildIndexRequest.Marshal(b, m, deterministic)
}
func (m *BuildIndexRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_BuildIndexRequest.Merge(m, src)
}
func (m *BuildIndexRequest) XXX_Size() int {
return xxx_messageInfo_BuildIndexRequest.Size(m)
}
func (m *BuildIndexRequest) XXX_DiscardUnknown() {
xxx_messageInfo_BuildIndexRequest.DiscardUnknown(m)
}
var xxx_messageInfo_BuildIndexRequest proto.InternalMessageInfo
func (m *BuildIndexRequest) GetIndexBuildID() int64 {
if m != nil {
return m.IndexBuildID
}
return 0
}
func (m *BuildIndexRequest) GetIndexName() string {
if m != nil {
return m.IndexName
}
return ""
}
func (m *BuildIndexRequest) GetIndexID() int64 {
if m != nil {
return m.IndexID
}
return 0
}
func (m *BuildIndexRequest) GetDataPaths() []string {
if m != nil {
return m.DataPaths
}
return nil
}
func (m *BuildIndexRequest) GetTypeParams() []*commonpb.KeyValuePair {
if m != nil {
return m.TypeParams
}
return nil
}
func (m *BuildIndexRequest) GetIndexParams() []*commonpb.KeyValuePair {
if m != nil {
return m.IndexParams
}
return nil
}
func (m *BuildIndexRequest) GetNumRows() int64 {
if m != nil {
return m.NumRows
}
return 0
}
func (m *BuildIndexRequest) GetFieldSchema() *schemapb.FieldSchema {
if m != nil {
return m.FieldSchema
}
return nil
}
func (m *BuildIndexRequest) GetSegmentID() int64 {
if m != nil {
return m.SegmentID
}
return 0
}
type IndexMeta struct {
IndexBuildID int64 `protobuf:"varint,1,opt,name=indexBuildID,proto3" json:"indexBuildID,omitempty"`
State commonpb.IndexState `protobuf:"varint,2,opt,name=state,proto3,enum=milvus.proto.common.IndexState" json:"state,omitempty"`
FailReason string `protobuf:"bytes,3,opt,name=fail_reason,json=failReason,proto3" json:"fail_reason,omitempty"`
Req *BuildIndexRequest `protobuf:"bytes,4,opt,name=req,proto3" json:"req,omitempty"`
IndexFilePaths []string `protobuf:"bytes,5,rep,name=index_file_paths,json=indexFilePaths,proto3" json:"index_file_paths,omitempty"`
MarkDeleted bool `protobuf:"varint,6,opt,name=mark_deleted,json=markDeleted,proto3" json:"mark_deleted,omitempty"`
NodeID int64 `protobuf:"varint,7,opt,name=nodeID,proto3" json:"nodeID,omitempty"`
IndexVersion int64 `protobuf:"varint,8,opt,name=index_version,json=indexVersion,proto3" json:"index_version,omitempty"`
Recycled bool `protobuf:"varint,9,opt,name=recycled,proto3" json:"recycled,omitempty"`
SerializeSize uint64 `protobuf:"varint,10,opt,name=serialize_size,json=serializeSize,proto3" json:"serialize_size,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *IndexMeta) Reset() { *m = IndexMeta{} }
func (m *IndexMeta) String() string { return proto.CompactTextString(m) }
func (*IndexMeta) ProtoMessage() {}
func (*IndexMeta) Descriptor() ([]byte, []int) {
return fileDescriptor_4b5c555c498591f0, []int{1}
}
func (m *IndexMeta) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_IndexMeta.Unmarshal(m, b)
}
func (m *IndexMeta) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_IndexMeta.Marshal(b, m, deterministic)
}
func (m *IndexMeta) XXX_Merge(src proto.Message) {
xxx_messageInfo_IndexMeta.Merge(m, src)
}
func (m *IndexMeta) XXX_Size() int {
return xxx_messageInfo_IndexMeta.Size(m)
}
func (m *IndexMeta) XXX_DiscardUnknown() {
xxx_messageInfo_IndexMeta.DiscardUnknown(m)
}
var xxx_messageInfo_IndexMeta proto.InternalMessageInfo
func (m *IndexMeta) GetIndexBuildID() int64 {
if m != nil {
return m.IndexBuildID
}
return 0
}
func (m *IndexMeta) GetState() commonpb.IndexState {
if m != nil {
return m.State
}
return commonpb.IndexState_IndexStateNone
}
func (m *IndexMeta) GetFailReason() string {
if m != nil {
return m.FailReason
}
return ""
}
func (m *IndexMeta) GetReq() *BuildIndexRequest {
if m != nil {
return m.Req
}
return nil
}
func (m *IndexMeta) GetIndexFilePaths() []string {
if m != nil {
return m.IndexFilePaths
}
return nil
}
func (m *IndexMeta) GetMarkDeleted() bool {
if m != nil {
return m.MarkDeleted
}
return false
}
func (m *IndexMeta) GetNodeID() int64 {
if m != nil {
return m.NodeID
}
return 0
}
func (m *IndexMeta) GetIndexVersion() int64 {
if m != nil {
return m.IndexVersion
}
return 0
}
func (m *IndexMeta) GetRecycled() bool {
if m != nil {
return m.Recycled
}
return false
}
func (m *IndexMeta) GetSerializeSize() uint64 {
if m != nil {
return m.SerializeSize
}
return 0
}
func init() {
proto.RegisterType((*BuildIndexRequest)(nil), "milvus.proto.legacy.BuildIndexRequest")
proto.RegisterType((*IndexMeta)(nil), "milvus.proto.legacy.IndexMeta")
}
func init() { proto.RegisterFile("legacy.proto", fileDescriptor_4b5c555c498591f0) }
var fileDescriptor_4b5c555c498591f0 = []byte{
// 511 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x92, 0xc1, 0x6b, 0xdb, 0x30,
0x14, 0xc6, 0xc9, 0xdc, 0xa6, 0xf1, 0xb3, 0x5b, 0x36, 0x0d, 0x86, 0x56, 0x36, 0xea, 0x66, 0x6c,
0xf8, 0xb2, 0x04, 0x52, 0x0a, 0x3b, 0x67, 0xa6, 0x10, 0xc6, 0x46, 0x50, 0xa0, 0x87, 0x5d, 0x8c,
0x12, 0xbf, 0x24, 0x62, 0xb2, 0x9d, 0x4a, 0x72, 0xbb, 0xe4, 0x8f, 0xd8, 0x75, 0xff, 0xee, 0x90,
0xe4, 0xb6, 0x84, 0xf5, 0xd0, 0x9b, 0xbf, 0x9f, 0xf5, 0x3d, 0xf9, 0x7d, 0x9f, 0x21, 0x96, 0xb8,
0xe2, 0x8b, 0xed, 0x60, 0xa3, 0x6a, 0x53, 0x93, 0xd7, 0xa5, 0x90, 0xb7, 0x8d, 0xf6, 0x6a, 0xe0,
0x5f, 0x9d, 0xc6, 0x8b, 0xba, 0x2c, 0xeb, 0xca, 0xc3, 0xd3, 0x58, 0x2f, 0xd6, 0x58, 0x72, 0xaf,
0xfa, 0x7f, 0x03, 0x78, 0x35, 0x6e, 0x84, 0x2c, 0x26, 0x55, 0x81, 0xbf, 0x19, 0xde, 0x34, 0xa8,
0x0d, 0xe9, 0x43, 0x2c, 0xac, 0xf6, 0x6f, 0x32, 0xda, 0x49, 0x3a, 0x69, 0xc0, 0xf6, 0x18, 0x79,
0x0f, 0xe0, 0x74, 0x5e, 0xf1, 0x12, 0xe9, 0x8b, 0xa4, 0x93, 0x86, 0x2c, 0x74, 0xe4, 0x07, 0x2f,
0x91, 0x50, 0x38, 0x72, 0x62, 0x92, 0xd1, 0xc0, 0xb9, 0xef, 0xa5, 0x35, 0x16, 0xdc, 0xf0, 0x7c,
0xc3, 0xcd, 0x5a, 0xd3, 0xc3, 0x24, 0xb0, 0x46, 0x4b, 0xa6, 0x16, 0x90, 0x31, 0x44, 0x66, 0xbb,
0xc1, 0x7c, 0xc3, 0x15, 0x2f, 0x35, 0xed, 0x26, 0x41, 0x1a, 0x8d, 0xce, 0x07, 0x7b, 0x8b, 0xb5,
0x0b, 0x7d, 0xc3, 0xed, 0x35, 0x97, 0x0d, 0x4e, 0xb9, 0x50, 0x0c, 0xac, 0x6b, 0xea, 0x4c, 0x24,
0x6b, 0xbf, 0xff, 0x7e, 0xc8, 0xd1, 0x73, 0x87, 0x44, 0xce, 0xd6, 0x4e, 0x79, 0x0b, 0xbd, 0xaa,
0x29, 0x73, 0x55, 0xdf, 0x69, 0xda, 0xf3, 0x3b, 0x54, 0x4d, 0xc9, 0xea, 0x3b, 0x4d, 0xbe, 0x42,
0xbc, 0x14, 0x28, 0x8b, 0xdc, 0x87, 0x49, 0xc3, 0xa4, 0x93, 0x46, 0xa3, 0x64, 0xff, 0x82, 0x36,
0xe8, 0x2b, 0x7b, 0x70, 0xe6, 0x9e, 0x59, 0xb4, 0x7c, 0x14, 0xe4, 0x1d, 0x84, 0x1a, 0x57, 0x25,
0x56, 0x66, 0x92, 0x51, 0x70, 0x17, 0x3c, 0x82, 0xfe, 0x9f, 0x00, 0x42, 0x57, 0xca, 0x77, 0x34,
0xfc, 0x59, 0x8d, 0x5c, 0xc2, 0xa1, 0x36, 0xdc, 0xf8, 0x32, 0x4e, 0x46, 0x67, 0x4f, 0xae, 0xeb,
0x46, 0xce, 0xec, 0x31, 0xe6, 0x4f, 0x93, 0x33, 0x88, 0x96, 0x5c, 0xc8, 0x5c, 0x21, 0xd7, 0x75,
0xe5, 0xda, 0x0a, 0x19, 0x58, 0xc4, 0x1c, 0x21, 0x5f, 0x20, 0x50, 0x78, 0x43, 0x0f, 0xdc, 0x8e,
0x9f, 0x06, 0x4f, 0xfc, 0x62, 0x83, 0xff, 0x7e, 0x21, 0x66, 0x2d, 0x24, 0x85, 0x97, 0xbe, 0x87,
0xa5, 0x90, 0xb8, 0x57, 0xf8, 0x89, 0xe3, 0x57, 0x42, 0xa2, 0x6f, 0xfd, 0x1c, 0xe2, 0x92, 0xab,
0x5f, 0x79, 0x81, 0x12, 0x0d, 0x16, 0xb4, 0x9b, 0x74, 0xd2, 0x1e, 0x8b, 0x2c, 0xcb, 0x3c, 0x22,
0x6f, 0xa0, 0x5b, 0xd5, 0x05, 0x4e, 0x32, 0x7a, 0xe4, 0x96, 0x6f, 0x15, 0xf9, 0x00, 0xc7, 0xfe,
0x92, 0x5b, 0x54, 0x5a, 0xd4, 0x55, 0xdb, 0x95, 0xcf, 0xe6, 0xda, 0x33, 0x72, 0x0a, 0x3d, 0x85,
0x8b, 0xed, 0x42, 0x62, 0xe1, 0xca, 0xea, 0xb1, 0x07, 0x4d, 0x3e, 0xc2, 0x89, 0x46, 0x25, 0xb8,
0x14, 0x3b, 0xcc, 0xb5, 0xd8, 0xa1, 0x2b, 0xe3, 0x80, 0x1d, 0x3f, 0xd0, 0x99, 0xd8, 0xe1, 0xf8,
0xf2, 0xe7, 0xc5, 0x4a, 0x98, 0x75, 0x33, 0xb7, 0x51, 0x0e, 0x7d, 0x0a, 0x9f, 0x45, 0xdd, 0x3e,
0x0d, 0x45, 0x65, 0x50, 0x55, 0x5c, 0x0e, 0x5d, 0x30, 0x43, 0x1f, 0xcc, 0x66, 0x3e, 0xef, 0x3a,
0x7d, 0xf1, 0x2f, 0x00, 0x00, 0xff, 0xff, 0x46, 0x37, 0x11, 0xcb, 0xa9, 0x03, 0x00, 0x00,
}

View File

@ -0,0 +1,19 @@
package legacy
import (
"fmt"
"github.com/milvus-io/milvus/cmd/tools/migration/utils"
)
func BuildCollectionIndexKey210(collectionID, indexID utils.UniqueID) string {
return fmt.Sprintf("%s/%d/%d", IndexMetaBefore220Prefix, collectionID, indexID)
}
func BuildSegmentIndexKey210(segmentID, indexID utils.UniqueID) string {
return fmt.Sprintf("%s/%d/%d", SegmentIndexPrefixBefore220, segmentID, indexID)
}
func BuildIndexBuildKey210(buildID utils.UniqueID) string {
return fmt.Sprintf("%s/%d", IndexBuildPrefixBefore220, buildID)
}

View File

@ -0,0 +1,11 @@
package main
import (
"os"
"github.com/milvus-io/milvus/cmd/tools/migration/command"
)
func main() {
command.Execute(os.Args)
}

View File

@ -0,0 +1,213 @@
package meta
import (
"fmt"
"sort"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/internal/metastore/model"
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/cmd/tools/migration/versions"
)
func alias210ToAlias220(record *pb.CollectionInfo, ts Timestamp) *model.Alias {
if record == nil {
return nil
}
return &model.Alias{
Name: record.GetSchema().GetName(),
CollectionID: record.GetID(),
CreatedTime: ts,
State: pb.AliasState_AliasCreated,
}
}
func (meta *TtAliasesMeta210) to220() (TtAliasesMeta220, error) {
ttAliases := make(TtAliasesMeta220)
for alias := range *meta {
for ts := range (*meta)[alias] {
aliasModel := alias210ToAlias220((*meta)[alias][ts], ts)
ttAliases.AddAlias(alias, aliasModel, ts)
}
}
return ttAliases, nil
}
func (meta *AliasesMeta210) to220() (AliasesMeta220, error) {
aliases := make(AliasesMeta220)
for alias := range *meta {
aliasModel := alias210ToAlias220((*meta)[alias], 0)
aliases.AddAlias(alias, aliasModel)
}
return aliases, nil
}
func getLatestFieldIndexes(colls map[Timestamp]*pb.CollectionInfo) *FieldIndexesWithSchema {
type pair struct {
ts Timestamp
coll *pb.CollectionInfo
}
l := len(colls)
pairs := make([]pair, l)
for ts, coll := range colls {
pairs = append(pairs, pair{ts: ts, coll: coll})
}
sort.Slice(pairs, func(i, j int) bool {
return pairs[i].ts < pairs[j].ts
})
if l > 0 && pairs[l-1].coll != nil {
return &FieldIndexesWithSchema{indexes: pairs[l-1].coll.GetFieldIndexes(), schema: pairs[l-1].coll.GetSchema()}
}
return nil
}
func collection210ToCollection220(coll *pb.CollectionInfo) *model.Collection {
return model.UnmarshalCollectionModel(coll)
}
func (meta *TtCollectionsMeta210) to220() (TtCollectionsMeta220, FieldIndexes210, error) {
ttCollections := make(TtCollectionsMeta220)
fieldIndexes := make(FieldIndexes210)
for collectionID := range *meta {
colls := (*meta)[collectionID]
indexes := getLatestFieldIndexes(colls)
if indexes != nil {
fieldIndexes.AddRecord(collectionID, indexes.indexes, indexes.schema)
}
for ts := range colls {
coll := colls[ts]
ttCollections.AddCollection(collectionID, collection210ToCollection220(coll), ts)
}
}
return ttCollections, fieldIndexes, nil
}
func (meta *CollectionsMeta210) to220() (CollectionsMeta220, FieldIndexes210, error) {
collections := make(CollectionsMeta220)
fieldIndexes := make(FieldIndexes210)
for collectionID := range *meta {
coll := (*meta)[collectionID]
fieldIndexes.AddRecord(collectionID, coll.GetFieldIndexes(), coll.GetSchema())
collections.AddCollection(collectionID, collection210ToCollection220(coll))
}
return collections, fieldIndexes, nil
}
func combineToCollectionIndexesMeta220(fieldIndexes FieldIndexes210, collectionIndexes CollectionIndexesMeta210) (CollectionIndexesMeta220, error) {
indexes := make(CollectionIndexesMeta220)
for collectionID := range fieldIndexes {
record := fieldIndexes[collectionID]
if record.schema == nil {
fmt.Println("combineToCollectionIndexesMeta220, nil schema: ", collectionID, ", record: ", record)
continue
}
helper, err := typeutil.CreateSchemaHelper(record.schema)
if err != nil {
return nil, err
}
for _, index := range record.indexes {
field, err := helper.GetFieldFromID(index.GetFiledID())
if err != nil {
return nil, err
}
indexInfo, err := collectionIndexes.GetIndex(collectionID, index.GetIndexID())
if err != nil {
return nil, err
}
record := &model.Index{
TenantID: "", // TODO: how to set this if we support mysql later?
CollectionID: collectionID,
FieldID: index.GetFiledID(),
IndexID: index.GetIndexID(),
IndexName: indexInfo.GetIndexName(),
IsDeleted: indexInfo.GetDeleted(),
CreateTime: indexInfo.GetCreateTime(),
TypeParams: field.GetTypeParams(),
IndexParams: indexInfo.GetIndexParams(),
}
indexes.AddRecord(collectionID, index.GetIndexID(), record)
}
}
return indexes, nil
}
func combineToSegmentIndexesMeta220(segmentIndexes SegmentIndexesMeta210, indexBuildMeta IndexBuildMeta210) (SegmentIndexesMeta220, error) {
segmentIndexModels := make(SegmentIndexesMeta220)
for segID := range segmentIndexes {
for indexID := range segmentIndexes[segID] {
record := segmentIndexes[segID][indexID]
buildMeta, ok := indexBuildMeta[record.GetBuildID()]
if !ok {
return nil, fmt.Errorf("index build meta not found, segment id: %d, index id: %d, index build id: %d", segID, indexID, record.GetBuildID())
}
segmentIndexModel := &model.SegmentIndex{
SegmentID: segID,
CollectionID: record.GetCollectionID(),
PartitionID: record.GetPartitionID(),
NumRows: 0, // TODO: how to set this?
IndexID: indexID,
BuildID: record.GetBuildID(),
NodeID: buildMeta.GetNodeID(),
IndexVersion: buildMeta.GetIndexVersion(),
IndexState: buildMeta.GetState(),
FailReason: buildMeta.GetFailReason(),
IsDeleted: buildMeta.GetMarkDeleted(),
CreateTime: record.GetCreateTime(),
IndexFilePaths: buildMeta.GetIndexFilePaths(),
IndexSize: buildMeta.GetSerializeSize(),
}
segmentIndexModels.AddRecord(segID, indexID, segmentIndexModel)
}
}
return segmentIndexModels, nil
}
func From210To220(metas *Meta) (*Meta, error) {
if !metas.Version.EQ(versions.Version210) {
return nil, fmt.Errorf("version mismatch: %s", metas.Version.String())
}
ttAliases, err := metas.Meta210.TtAliases.to220()
if err != nil {
return nil, err
}
aliases, err := metas.Meta210.Aliases.to220()
if err != nil {
return nil, err
}
ttCollections, fieldIndexes, err := metas.Meta210.TtCollections.to220()
if err != nil {
return nil, err
}
collections, fieldIndexes2, err := metas.Meta210.Collections.to220()
if err != nil {
return nil, err
}
fieldIndexes.Merge(fieldIndexes2)
collectionIndexes, err := combineToCollectionIndexesMeta220(fieldIndexes, metas.Meta210.CollectionIndexes)
if err != nil {
return nil, err
}
segmentIndexes, err := combineToSegmentIndexesMeta220(metas.Meta210.SegmentIndexes, metas.Meta210.IndexBuildMeta)
if err != nil {
return nil, err
}
metas220 := &Meta{
SourceVersion: metas.Version,
Version: versions.Version220,
Meta220: &All220{
TtCollections: ttCollections,
Collections: collections,
TtAliases: ttAliases,
Aliases: aliases,
TtPartitions: make(TtPartitionsMeta220),
Partitions: make(PartitionsMeta220),
TtFields: make(TtFieldsMeta220),
Fields: make(FieldsMeta220),
CollectionIndexes: collectionIndexes,
SegmentIndexes: segmentIndexes,
},
}
return metas220, nil
}

View File

@ -0,0 +1,17 @@
package meta
import (
"github.com/blang/semver/v4"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
type UniqueID = typeutil.UniqueID
type Timestamp = typeutil.Timestamp
type Meta struct {
SourceVersion semver.Version
Version semver.Version
Meta210 *All210
Meta220 *All220
}

View File

@ -0,0 +1,312 @@
package meta
import (
"fmt"
"github.com/milvus-io/milvus/cmd/tools/migration/legacy"
"github.com/milvus-io/milvus/cmd/tools/migration/legacy/legacypb"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/metastore/kv/rootcoord"
"github.com/milvus-io/milvus/api/schemapb"
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
)
type FieldIndexesWithSchema struct {
indexes []*pb.FieldIndexInfo
schema *schemapb.CollectionSchema
}
type FieldIndexes210 map[UniqueID]*FieldIndexesWithSchema // coll_id -> field indexes.
type TtCollectionsMeta210 map[UniqueID]map[Timestamp]*pb.CollectionInfo // coll_id -> ts -> coll
type CollectionsMeta210 map[UniqueID]*pb.CollectionInfo // coll_id -> coll
type TtAliasesMeta210 map[string]map[Timestamp]*pb.CollectionInfo // alias name -> ts -> coll
type AliasesMeta210 map[string]*pb.CollectionInfo // alias name -> coll
type CollectionIndexesMeta210 map[UniqueID]map[UniqueID]*pb.IndexInfo // coll_id -> index_id -> index
type SegmentIndexesMeta210 map[UniqueID]map[UniqueID]*pb.SegmentIndexInfo // seg_id -> index_id -> segment index
type IndexBuildMeta210 map[UniqueID]*legacypb.IndexMeta // index_build_id -> index
type LastDDLRecords map[string]string // We don't care this since it didn't work.
type All210 struct {
TtAliases TtAliasesMeta210
Aliases AliasesMeta210
TtCollections TtCollectionsMeta210
Collections CollectionsMeta210
CollectionIndexes CollectionIndexesMeta210
SegmentIndexes SegmentIndexesMeta210
IndexBuildMeta IndexBuildMeta210
LastDDLRecords LastDDLRecords
}
func (meta *All210) GenerateSaves() map[string]string {
ttAliases := meta.TtAliases.GenerateSaves()
aliases := meta.Aliases.GenerateSaves()
ttCollections := meta.TtCollections.GenerateSaves()
collections := meta.Collections.GenerateSaves()
collectionIndexes := meta.CollectionIndexes.GenerateSaves()
segmentIndexes := meta.SegmentIndexes.GenerateSaves()
indexBuilds := meta.IndexBuildMeta.GenerateSaves()
lastDdlRecords := meta.LastDDLRecords.GenerateSaves()
return merge(false,
ttAliases, aliases,
ttCollections, collections,
collectionIndexes,
segmentIndexes,
indexBuilds,
lastDdlRecords)
}
func merge(clone bool, kvs ...map[string]string) map[string]string {
if len(kvs) <= 0 {
return map[string]string{}
}
var ret map[string]string
var iterator int
if clone {
ret = make(map[string]string)
iterator = 0
} else {
ret = kvs[0]
iterator = 1
}
for i := iterator; i < len(kvs); i++ {
for k, v := range kvs[i] {
ret[k] = v
}
}
return ret
}
func (meta *TtAliasesMeta210) AddAlias(alias string, info *pb.CollectionInfo, ts Timestamp) {
if _, aliasExist := (*meta)[alias]; !aliasExist {
(*meta)[alias] = map[Timestamp]*pb.CollectionInfo{
ts: info,
}
} else {
(*meta)[alias][ts] = info
}
}
func (meta *TtAliasesMeta210) GenerateSaves() map[string]string {
kvs := make(map[string]string)
var v []byte
var err error
for alias := range *meta {
for ts := range (*meta)[alias] {
k := rootcoord.ComposeSnapshotKey(rootcoord.SnapshotPrefix, rootcoord.BuildAliasKey210(alias), rootcoord.SnapshotsSep, ts)
record := (*meta)[alias][ts]
if record == nil {
v = rootcoord.ConstructTombstone()
} else {
v, err = proto.Marshal(record)
if err != nil {
panic(err)
}
}
kvs[k] = string(v)
}
}
return kvs
}
func (meta *AliasesMeta210) AddAlias(alias string, info *pb.CollectionInfo) {
(*meta)[alias] = info
}
func (meta *AliasesMeta210) GenerateSaves() map[string]string {
kvs := make(map[string]string)
var v []byte
var err error
for alias := range *meta {
record := (*meta)[alias]
k := rootcoord.BuildAliasKey210(alias)
if record == nil {
v = rootcoord.ConstructTombstone()
} else {
v, err = proto.Marshal(record)
if err != nil {
panic(err)
}
}
kvs[k] = string(v)
}
return kvs
}
func (meta *TtCollectionsMeta210) AddCollection(collID UniqueID, coll *pb.CollectionInfo, ts Timestamp) {
if _, collExist := (*meta)[collID]; !collExist {
(*meta)[collID] = map[Timestamp]*pb.CollectionInfo{
ts: coll,
}
} else {
(*meta)[collID][ts] = coll
}
}
func (meta *TtCollectionsMeta210) GenerateSaves() map[string]string {
kvs := make(map[string]string)
var v []byte
var err error
for collection := range *meta {
for ts := range (*meta)[collection] {
k := rootcoord.ComposeSnapshotKey(rootcoord.SnapshotPrefix, rootcoord.BuildCollectionKey(collection), rootcoord.SnapshotsSep, ts)
record := (*meta)[collection][ts]
if record == nil {
v = rootcoord.ConstructTombstone()
} else {
v, err = proto.Marshal(record)
if err != nil {
panic(err)
}
}
kvs[k] = string(v)
}
}
return kvs
}
func (meta *CollectionsMeta210) AddCollection(collID UniqueID, coll *pb.CollectionInfo) {
(*meta)[collID] = coll
}
func (meta *CollectionsMeta210) GenerateSaves() map[string]string {
kvs := make(map[string]string)
var v []byte
var err error
for collection := range *meta {
record := (*meta)[collection]
k := rootcoord.BuildCollectionKey(collection)
if record == nil {
v = rootcoord.ConstructTombstone()
} else {
v, err = proto.Marshal(record)
if err != nil {
panic(err)
}
}
kvs[k] = string(v)
}
return kvs
}
func (meta *CollectionIndexesMeta210) AddIndex(collectionID UniqueID, indexID UniqueID, index *pb.IndexInfo) {
if _, collExist := (*meta)[collectionID]; !collExist {
(*meta)[collectionID] = map[UniqueID]*pb.IndexInfo{
indexID: index,
}
} else {
(*meta)[collectionID][indexID] = index
}
}
func (meta *CollectionIndexesMeta210) GetIndex(collectionID UniqueID, indexID UniqueID) (*pb.IndexInfo, error) {
if _, collExist := (*meta)[collectionID]; !collExist {
return nil, fmt.Errorf("collection not exist: %d", collectionID)
}
if _, indexExist := (*meta)[collectionID][indexID]; !indexExist {
return nil, fmt.Errorf("index not exist, collection: %d, index: %d", collectionID, indexID)
}
return (*meta)[collectionID][indexID], nil
}
func (meta *CollectionIndexesMeta210) GenerateSaves() map[string]string {
kvs := make(map[string]string)
var v []byte
var err error
for collectionID := range *meta {
for indexID := range (*meta)[collectionID] {
k := legacy.BuildCollectionIndexKey210(collectionID, indexID)
record := (*meta)[collectionID][indexID]
if record == nil {
v = rootcoord.ConstructTombstone()
} else {
v, err = proto.Marshal(record)
if err != nil {
panic(err)
}
}
kvs[k] = string(v)
}
}
return kvs
}
func (meta *SegmentIndexesMeta210) AddIndex(segmentID UniqueID, indexID UniqueID, index *pb.SegmentIndexInfo) {
if _, segExist := (*meta)[segmentID]; !segExist {
(*meta)[segmentID] = map[UniqueID]*pb.SegmentIndexInfo{
indexID: index,
}
} else {
(*meta)[segmentID][indexID] = index
}
}
func (meta *SegmentIndexesMeta210) GenerateSaves() map[string]string {
kvs := make(map[string]string)
var v []byte
var err error
for segmentID := range *meta {
for indexID := range (*meta)[segmentID] {
k := legacy.BuildSegmentIndexKey210(segmentID, indexID)
record := (*meta)[segmentID][indexID]
if record == nil {
v = rootcoord.ConstructTombstone()
} else {
v, err = proto.Marshal(record)
if err != nil {
panic(err)
}
}
kvs[k] = string(v)
}
}
return kvs
}
func (meta *IndexBuildMeta210) AddRecord(indexBuildID UniqueID, record *legacypb.IndexMeta) {
(*meta)[indexBuildID] = record
}
func (meta *IndexBuildMeta210) GenerateSaves() map[string]string {
kvs := make(map[string]string)
var v []byte
var err error
for buildID := range *meta {
record := (*meta)[buildID]
k := legacy.BuildIndexBuildKey210(buildID)
v, err = proto.Marshal(record)
if err != nil {
panic(err)
}
kvs[k] = string(v)
}
return kvs
}
func (meta *FieldIndexes210) AddRecord(collectionID UniqueID, fieldIndexes []*pb.FieldIndexInfo, schema *schemapb.CollectionSchema) {
(*meta)[collectionID] = &FieldIndexesWithSchema{indexes: fieldIndexes, schema: schema}
}
func (meta *FieldIndexes210) Merge(other FieldIndexes210) {
for collectionID := range other {
meta.AddRecord(collectionID, (other)[collectionID].indexes, (other)[collectionID].schema)
}
}
func (meta *LastDDLRecords) AddRecord(k, v string) {
(*meta)[k] = v
}
func (meta *LastDDLRecords) GenerateSaves() map[string]string {
return *meta
}

View File

@ -0,0 +1,248 @@
package meta
import (
"github.com/blang/semver/v4"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/cmd/tools/migration/versions"
"github.com/milvus-io/milvus/internal/metastore/kv/indexcoord"
"github.com/milvus-io/milvus/internal/metastore/kv/rootcoord"
"github.com/milvus-io/milvus/internal/metastore/model"
)
type TtCollectionsMeta220 map[UniqueID]map[Timestamp]*model.Collection // coll_id -> ts -> coll
type CollectionsMeta220 map[UniqueID]*model.Collection // coll_id -> coll
type TtAliasesMeta220 map[string]map[Timestamp]*model.Alias // alias name -> ts -> coll
type AliasesMeta220 map[string]*model.Alias // alias name -> coll
type TtPartitionsMeta220 map[UniqueID]map[Timestamp][]*model.Partition // coll_id -> ts -> partitions
type PartitionsMeta220 map[UniqueID][]*model.Partition // coll_id -> ts -> partitions
type TtFieldsMeta220 map[UniqueID]map[Timestamp][]*model.Field // coll_id -> ts -> fields
type FieldsMeta220 map[UniqueID][]*model.Field // coll_id -> ts -> fields
type CollectionIndexesMeta220 map[UniqueID]map[UniqueID]*model.Index // coll_id -> index_id -> index
type SegmentIndexesMeta220 map[UniqueID]map[UniqueID]*model.SegmentIndex // seg_id -> index_id -> segment index
func (meta *TtCollectionsMeta220) GenerateSaves(sourceVersion semver.Version) (map[string]string, error) {
saves := make(map[string]string)
opts := make([]model.Option, 0)
if sourceVersion.LT(versions.Version220) {
opts = append(opts, model.WithFields())
opts = append(opts, model.WithPartitions())
}
for collectionID := range *meta {
for ts := range (*meta)[collectionID] {
ckey := rootcoord.BuildCollectionKey(collectionID)
key := rootcoord.ComposeSnapshotKey(rootcoord.SnapshotPrefix, ckey, rootcoord.SnapshotsSep, ts)
collection := (*meta)[collectionID][ts]
var value string
if collection == nil {
// save a tombstone.
value = string(rootcoord.ConstructTombstone())
} else {
collectionPb := model.MarshalCollectionModelWithOption(collection, opts...)
marshaledCollectionPb, err := proto.Marshal(collectionPb)
if err != nil {
return nil, err
}
value = string(marshaledCollectionPb)
}
saves[key] = value
}
}
return saves, nil
}
func (meta *TtCollectionsMeta220) AddCollection(collectionID UniqueID, coll *model.Collection, ts Timestamp) {
_, collExist := (*meta)[collectionID]
if collExist {
(*meta)[collectionID][ts] = coll
} else {
(*meta)[collectionID] = map[Timestamp]*model.Collection{
ts: coll,
}
}
}
func (meta *CollectionsMeta220) AddCollection(collectionID UniqueID, coll *model.Collection) {
(*meta)[collectionID] = coll
}
func (meta *CollectionsMeta220) GenerateSaves(sourceVersion semver.Version) (map[string]string, error) {
saves := make(map[string]string)
opts := make([]model.Option, 0)
if sourceVersion.LT(versions.Version220) {
opts = append(opts, model.WithFields())
opts = append(opts, model.WithPartitions())
}
for collectionID := range *meta {
ckey := rootcoord.BuildCollectionKey(collectionID)
collection := (*meta)[collectionID]
var value string
if collection == nil {
// save a tombstone.
value = string(rootcoord.ConstructTombstone())
} else {
collectionPb := model.MarshalCollectionModelWithOption(collection, opts...)
marshaledCollectionPb, err := proto.Marshal(collectionPb)
if err != nil {
return nil, err
}
value = string(marshaledCollectionPb)
}
saves[ckey] = value
}
return saves, nil
}
func (meta *TtAliasesMeta220) AddAlias(alias string, aliasInfo *model.Alias, ts Timestamp) {
_, aliasExist := (*meta)[alias]
if aliasExist {
(*meta)[alias][ts] = aliasInfo
} else {
(*meta)[alias] = map[Timestamp]*model.Alias{
ts: aliasInfo,
}
}
}
func (meta *TtAliasesMeta220) GenerateSaves() (map[string]string, error) {
saves := make(map[string]string)
for alias := range *meta {
for ts := range (*meta)[alias] {
ckey := rootcoord.BuildAliasKey(alias)
key := rootcoord.ComposeSnapshotKey(rootcoord.SnapshotPrefix, ckey, rootcoord.SnapshotsSep, ts)
aliasInfo := (*meta)[alias][ts]
var value string
if aliasInfo == nil {
// save a tombstone.
value = string(rootcoord.ConstructTombstone())
} else {
aliasPb := model.MarshalAliasModel(aliasInfo)
marshaledAliasPb, err := proto.Marshal(aliasPb)
if err != nil {
return nil, err
}
value = string(marshaledAliasPb)
}
saves[key] = value
}
}
return saves, nil
}
func (meta *AliasesMeta220) AddAlias(alias string, aliasInfo *model.Alias) {
(*meta)[alias] = aliasInfo
}
func (meta *AliasesMeta220) GenerateSaves() (map[string]string, error) {
saves := make(map[string]string)
for alias := range *meta {
ckey := rootcoord.BuildAliasKey(alias)
aliasInfo := (*meta)[alias]
var value string
if aliasInfo == nil {
// save a tombstone.
value = string(rootcoord.ConstructTombstone())
} else {
aliasPb := model.MarshalAliasModel(aliasInfo)
marshaledAliasPb, err := proto.Marshal(aliasPb)
if err != nil {
return nil, err
}
value = string(marshaledAliasPb)
}
saves[ckey] = value
}
return saves, nil
}
func (meta *CollectionIndexesMeta220) AddRecord(collID UniqueID, indexID int64, record *model.Index) {
if _, collExist := (*meta)[collID]; !collExist {
(*meta)[collID] = map[UniqueID]*model.Index{
indexID: record,
}
} else {
(*meta)[collID][indexID] = record
}
}
func (meta *CollectionIndexesMeta220) GenerateSaves() (map[string]string, error) {
saves := make(map[string]string)
for collectionID := range *meta {
for indexID := range (*meta)[collectionID] {
ckey := indexcoord.BuildIndexKey(collectionID, indexID)
index := (*meta)[collectionID][indexID]
var value string
indexPb := model.MarshalIndexModel(index)
marshaledIndexPb, err := proto.Marshal(indexPb)
if err != nil {
return nil, err
}
value = string(marshaledIndexPb)
saves[ckey] = value
}
}
return saves, nil
}
func (meta *SegmentIndexesMeta220) GenerateSaves() (map[string]string, error) {
saves := make(map[string]string)
for segmentID := range *meta {
for indexID := range (*meta)[segmentID] {
index := (*meta)[segmentID][indexID]
ckey := indexcoord.BuildSegmentIndexKey(index.CollectionID, index.PartitionID, index.SegmentID, index.BuildID)
var value string
indexPb := model.MarshalSegmentIndexModel(index)
marshaledIndexPb, err := proto.Marshal(indexPb)
if err != nil {
return nil, err
}
value = string(marshaledIndexPb)
saves[ckey] = value
}
}
return saves, nil
}
func (meta *SegmentIndexesMeta220) AddRecord(segID UniqueID, indexID UniqueID, record *model.SegmentIndex) {
if _, segExist := (*meta)[segID]; !segExist {
(*meta)[segID] = map[UniqueID]*model.SegmentIndex{
indexID: record,
}
} else {
(*meta)[segID][indexID] = record
}
}
type All220 struct {
TtCollections TtCollectionsMeta220
Collections CollectionsMeta220
TtAliases TtAliasesMeta220
Aliases AliasesMeta220
TtPartitions TtPartitionsMeta220
Partitions PartitionsMeta220
TtFields TtFieldsMeta220
Fields FieldsMeta220
CollectionIndexes CollectionIndexesMeta220
SegmentIndexes SegmentIndexesMeta220
}

View File

@ -0,0 +1,16 @@
package migration
import (
"github.com/milvus-io/milvus/cmd/tools/migration/meta"
)
type migrator210To220 struct {
}
func (m migrator210To220) Migrate(metas *meta.Meta) (*meta.Meta, error) {
return meta.From210To220(metas)
}
func newMigrator210To220() *migrator210To220 {
return &migrator210To220{}
}

View File

@ -0,0 +1,18 @@
package migration
import "github.com/milvus-io/milvus/internal/util/typeutil"
const (
Role = "migration"
)
var Roles = []string{
typeutil.RootCoordRole,
typeutil.IndexCoordRole,
typeutil.IndexNodeRole,
typeutil.DataCoordRole,
typeutil.DataNodeRole,
typeutil.QueryCoordRole,
typeutil.QueryNodeRole,
typeutil.ProxyRole,
}

View File

@ -0,0 +1,21 @@
package migration
type Migration interface {
// Validate if migration can be executed. For example, higher version to lower version is not allowed.
Validate() error
// CheckCompatible check if target is compatible with source. If compatible, no migration should be executed.
CheckCompatible() bool
// CheckSessions check if any sessions are alive. Abort migration if any.
CheckSessions() error
// RegisterSession register session to avoid any other migration is also running, registered session will be deleted
// as soon as possible after migration is done.
RegisterSession() error
// Backup source meta information.
Backup() error
// Migrate to target backend.
Migrate() error
// Rollback migration.
Rollback() error
// Stop complete the migration overflow.
Stop()
}

View File

@ -0,0 +1,32 @@
package migration
import (
"fmt"
"github.com/blang/semver/v4"
"github.com/milvus-io/milvus/cmd/tools/migration/meta"
"github.com/milvus-io/milvus/cmd/tools/migration/versions"
)
type Migrator interface {
Migrate(metas *meta.Meta) (*meta.Meta, error)
}
func NewMigrator(sourceVersion, targetVersion string) (Migrator, error) {
source, err := semver.Parse(sourceVersion)
if err != nil {
return nil, err
}
target, err := semver.Parse(targetVersion)
if err != nil {
return nil, err
}
if versions.Range21x(source) && versions.Range22x(target) {
return newMigrator210To220(), nil
}
return nil, fmt.Errorf("migration from source version to target version is forbidden, source: %s, target: %s",
sourceVersion, targetVersion)
}

View File

@ -0,0 +1,214 @@
package migration
import (
"context"
"fmt"
"sync"
"time"
"github.com/milvus-io/milvus/cmd/tools/migration/versions"
"github.com/blang/semver/v4"
"github.com/milvus-io/milvus/cmd/tools/migration/configs"
"github.com/milvus-io/milvus/cmd/tools/migration/console"
"github.com/milvus-io/milvus/cmd/tools/migration/backend"
clientv3 "go.etcd.io/etcd/client/v3"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/sessionutil"
)
type Runner struct {
ctx context.Context
cancel context.CancelFunc
cfg *configs.Config
initOnce sync.Once
session *sessionutil.Session
address string
etcdCli *clientv3.Client
wg sync.WaitGroup
}
func NewRunner(ctx context.Context, cfg *configs.Config) *Runner {
ctx1, cancel := context.WithCancel(ctx)
runner := &Runner{ctx: ctx1, cancel: cancel, cfg: cfg}
runner.initOnce.Do(runner.init)
return runner
}
func (r *Runner) watchByPrefix(prefix string) {
defer r.wg.Done()
_, revision, err := r.session.GetSessions(prefix)
console.ExitIf(err)
eventCh := r.session.WatchServices(prefix, revision, nil)
for {
select {
case <-r.ctx.Done():
return
case event := <-eventCh:
msg := fmt.Sprintf("session up/down, exit migration, event type: %s, session: %s", event.EventType.String(), event.Session.String())
console.Exit(msg)
}
}
}
func (r *Runner) WatchSessions() {
for _, role := range Roles {
r.wg.Add(1)
go r.watchByPrefix(role)
}
}
func (r *Runner) initEtcdCli() {
cli, err := etcd.GetEtcdClient(r.cfg.EtcdCfg)
console.ExitIf(err)
r.etcdCli = cli
}
func (r *Runner) init() {
r.initEtcdCli()
r.session = sessionutil.NewSession(r.ctx, r.cfg.EtcdCfg.MetaRootPath, r.etcdCli)
// address not important here.
address := time.Now().String()
r.address = address
r.session.Init(Role, address, true, true)
r.WatchSessions()
}
func (r *Runner) Validate() error {
source, err := semver.Parse(r.cfg.SourceVersion)
if err != nil {
return err
}
target, err := semver.Parse(r.cfg.TargetVersion)
if err != nil {
return err
}
can := target.GTE(source)
if can {
return nil
}
return fmt.Errorf("higher version to lower version is not allowed, "+
"source version: %s, target version: %s", source.String(), target.String())
}
func (r *Runner) CheckCompatible() bool {
// TODO: more accurate.
target, err := semver.Parse(r.cfg.TargetVersion)
if err != nil {
return false
}
return target.LT(versions.Version220)
}
func (r *Runner) checkSessionsWithPrefix(prefix string) error {
sessions, _, err := r.session.GetSessions(prefix)
if err != nil {
return err
}
if len(sessions) > 0 {
return fmt.Errorf("there are still sessions alive, prefix: %s, num of alive sessions: %d", prefix, len(sessions))
}
return nil
}
func (r *Runner) checkMySelf() error {
sessions, _, err := r.session.GetSessions(Role)
if err != nil {
return err
}
for _, session := range sessions {
if session.Address != r.address {
return fmt.Errorf("other migration is running")
}
}
return nil
}
func (r *Runner) CheckSessions() error {
if err := r.checkMySelf(); err != nil {
return err
}
for _, prefix := range Roles {
if err := r.checkSessionsWithPrefix(prefix); err != nil {
return err
}
}
return nil
}
func (r *Runner) RegisterSession() error {
r.session.Register()
go r.session.LivenessCheck(r.ctx, func() {})
return nil
}
func (r *Runner) Backup() error {
source, err := backend.NewBackend(r.cfg.MilvusConfig, r.cfg.SourceVersion)
if err != nil {
return err
}
metas, err := source.Load()
if err != nil {
return err
}
return source.Backup(metas, r.cfg.BackupFilePath)
}
func (r *Runner) Rollback() error {
source, err := backend.NewBackend(r.cfg.MilvusConfig, r.cfg.SourceVersion)
if err != nil {
return err
}
target, err := backend.NewBackend(r.cfg.MilvusConfig, r.cfg.TargetVersion)
if err != nil {
return err
}
if err := source.Clean(); err != nil {
return err
}
if err := target.Clean(); err != nil {
return err
}
return source.Restore(r.cfg.BackupFilePath)
}
func (r *Runner) Migrate() error {
migrator, err := NewMigrator(r.cfg.SourceVersion, r.cfg.TargetVersion)
if err != nil {
return err
}
source, err := backend.NewBackend(r.cfg.MilvusConfig, r.cfg.SourceVersion)
if err != nil {
return err
}
metas, err := source.Load()
if err != nil {
return err
}
if err := source.Backup(metas, r.cfg.BackupFilePath); err != nil {
return err
}
if err := source.Clean(); err != nil {
return err
}
targetMetas, err := migrator.Migrate(metas)
if err != nil {
return err
}
target, err := backend.NewBackend(r.cfg.MilvusConfig, r.cfg.TargetVersion)
if err != nil {
return err
}
return target.Save(targetMetas)
}
func (r *Runner) Stop() {
r.session.Revoke(time.Second)
r.cancel()
r.wg.Wait()
}

View File

@ -0,0 +1,49 @@
package utils
import (
"fmt"
"strconv"
"strings"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/internal/metastore/kv/rootcoord"
)
type UniqueID = typeutil.UniqueID
type Timestamp = typeutil.Timestamp
type errNotOfTsKey struct {
key string
}
func (e errNotOfTsKey) Error() string {
return fmt.Sprintf("%s is not of snapshot", e.key)
}
func NewErrNotOfTsKey(key string) *errNotOfTsKey {
return &errNotOfTsKey{key: key}
}
func IsErrNotOfTsKey(err error) bool {
_, ok := err.(*errNotOfTsKey)
return ok
}
func SplitBySeparator(s string) (key string, ts Timestamp, err error) {
got := strings.Split(s, rootcoord.SnapshotsSep)
if len(got) != 2 {
return "", 0, NewErrNotOfTsKey(s)
}
convertedTs, err := strconv.Atoi(got[1])
if err != nil {
return "", 0, fmt.Errorf("%s is not of snapshot", s)
}
return got[0], Timestamp(convertedTs), nil
}
func GetFileName(p string) string {
got := strings.Split(p, "/")
l := len(got)
return got[l-1]
}

View File

@ -0,0 +1,40 @@
package utils
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestSplitBySeparator(t *testing.T) {
tsKey := "435783141193354561_ts435783141193154564"
k, ts, err := SplitBySeparator(tsKey)
assert.NoError(t, err)
assert.Equal(t, "435783141193354561", k)
assert.Equal(t, Timestamp(435783141193154564), ts)
}
func TestGetFileName(t *testing.T) {
type args struct {
p string
}
tests := []struct {
name string
args args
want string
}{
{
args: args{p: "snapshots/root-coord/collection/436611447439428929_ts436611447439228933"},
want: "436611447439428929_ts436611447439228933",
},
{
args: args{p: "root-coord/collection/436611447439428929"},
want: "436611447439428929",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.want, GetFileName(tt.args.p), "GetFileName(%v)", tt.args.p)
})
}
}

View File

@ -0,0 +1,32 @@
package versions
import "github.com/blang/semver/v4"
const (
version210Str = "2.1.0"
version220Str = "2.2.0"
version230Str = "2.3.0"
VersionMaxStr = "1000.1000.1000"
)
var (
Version220 semver.Version
Version210 semver.Version
Version230 semver.Version
VersionMax semver.Version
)
func init() {
Version210, _ = semver.Parse(version210Str)
Version220, _ = semver.Parse(version220Str)
Version230, _ = semver.Parse(version230Str)
VersionMax, _ = semver.Parse(VersionMaxStr)
}
func Range21x(version semver.Version) bool {
return version.GTE(Version210) && version.LT(Version220)
}
func Range22x(version semver.Version) bool {
return version.GTE(Version220) && version.LT(Version230)
}

View File

@ -0,0 +1,77 @@
package versions
import (
"testing"
"github.com/blang/semver/v4"
)
func TestRange21x(t *testing.T) {
type args struct {
version semver.Version
}
tests := []struct {
name string
args args
want bool
}{
{
args: args{version: VersionMax},
want: false,
},
{
args: args{version: Version230},
want: false,
},
{
args: args{version: Version220},
want: false,
},
{
args: args{version: Version210},
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := Range21x(tt.args.version); got != tt.want {
t.Errorf("Range21x() = %v, want %v", got, tt.want)
}
})
}
}
func TestRange22x(t *testing.T) {
type args struct {
version semver.Version
}
tests := []struct {
name string
args args
want bool
}{
{
args: args{version: VersionMax},
want: false,
},
{
args: args{version: Version230},
want: false,
},
{
args: args{version: Version220},
want: true,
},
{
args: args{version: Version210},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := Range22x(tt.args.version); got != tt.want {
t.Errorf("Range22x() = %v, want %v", got, tt.want)
}
})
}
}

21
go.mod
View File

@ -5,7 +5,7 @@ go 1.18
require (
github.com/99designs/keyring v1.2.1 // indirect
github.com/BurntSushi/toml v1.0.0
github.com/DATA-DOG/go-sqlmock v1.5.0 // indirect
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect
github.com/StackExchange/wmi v1.2.1 // indirect
github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210826220005-b48c857c3a0e
@ -20,7 +20,6 @@ require (
github.com/gin-gonic/gin v1.7.7
github.com/go-basic/ipv4 v1.0.0
github.com/gofrs/flock v0.8.1
github.com/golang/mock v1.5.0
github.com/golang/protobuf v1.5.2
github.com/google/btree v1.0.1
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
@ -61,9 +60,14 @@ require (
stathat.com/c/consistent v1.0.0
)
require github.com/apache/thrift v0.15.0
require (
github.com/apache/thrift v0.15.0 // indirect
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
)
require github.com/sandertv/go-formula/v2 v2.0.0-alpha.7 // indirect
require github.com/sandertv/go-formula/v2 v2.0.0-alpha.7
require github.com/quasilyte/go-ruleguard/dsl v0.3.21 // indirect
require (
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
@ -141,7 +145,6 @@ require (
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
github.com/quasilyte/go-ruleguard/dsl v0.3.21 // indirect
github.com/rs/xid v1.2.1 // indirect
github.com/samber/lo v1.27.0
github.com/sirupsen/logrus v1.8.1 // indirect
@ -202,8 +205,6 @@ replace (
github.com/tecbot/gorocksdb => github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b // indirect
)
replace (
// If you want to use the hook interceptor, the following code should be commented out
// and you should modify the api version to be the same as the `so` project.
github.com/milvus-io/milvus/api => ./api
)
// If you want to use the hook interceptor, the following code should be commented out
// and you should modify the api version to be the same as the `so` project.
replace github.com/milvus-io/milvus/api => ./api

11
go.sum
View File

@ -463,7 +463,6 @@ github.com/kris-nova/lolgopher v0.0.0-20180921204813-313b3abb0d9b h1:xYEM2oBUhBE
github.com/kris-nova/lolgopher v0.0.0-20180921204813-313b3abb0d9b/go.mod h1:V0HF/ZBlN86HqewcDC/cVxMmYDiRukWjSrgKLUAn9Js=
github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y=
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76 h1:IVlcvV0CjvfBYYod5ePe89l+3LBAl//6n9kJ9Vr2i0k=
github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76/go.mod h1:Iu9BHUvTh8/KpbuSoKx/CaJEdJvFxSverxIy7I+nq7s=
github.com/linkedin/goavro v2.1.0+incompatible/go.mod h1:bBCwI2eGYpUI/4820s67MElg9tdeLbINjLjiM2xZFYM=
@ -486,14 +485,13 @@ github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Ky
github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.8 h1:3tS41NlGYSmhhe/8fhGRzc+z3AYCw1Fe1WAyLuujKs0=
github.com/mattn/go-runewidth v0.0.8/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d h1:5PJl274Y63IEHC+7izoQE9x6ikvDFZS2mDVS3drnohI=
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus/api v0.0.0-20220915082433-b1f4c60117bb h1:vKuSxdR+egH8P7QOgO/WP0c2JpLnGPpt1DF6bSE6tmQ=
github.com/milvus-io/milvus/api v0.0.0-20220915082433-b1f4c60117bb/go.mod h1:ZhlXJ8rZ90MBqJZNniyzcT8dibpZ8wRW2c1Tcb8/6oo=
github.com/milvus-io/pulsar-client-go v0.6.8 h1:fZdZH73aPRszu2fazyeeahQEz34tyn1Pt9EkqJmV100=
github.com/milvus-io/pulsar-client-go v0.6.8/go.mod h1:oFIlYIk23tamkSLttw849qphmMIpHY8ztEBWDWJW+sc=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
@ -502,8 +500,6 @@ github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8D
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE=
github.com/minio/md5-simd v1.1.0 h1:QPfiOqlZH+Cj9teu0t9b1nTBfPbyTl16Of5MeuShdK4=
github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw=
github.com/minio/minio-go/v7 v7.0.10 h1:1oUKe4EOPUEhw2qnPQaPsJ0lmVTYLFu03SiItauXs94=
github.com/minio/minio-go/v7 v7.0.10/go.mod h1:td4gW1ldOsj1PbSNS+WYK43j+P1XVhX/8W8awaYlBFo=
github.com/minio/minio-go/v7 v7.0.17 h1:5SiS3pqiQDbNhmXMxtqn2HzAInbN5cbHT7ip9F0F07E=
github.com/minio/minio-go/v7 v7.0.17/go.mod h1:SyQ1IFeJuaa+eV5yEDxW7hYE1s5VVq5sgImDe27R+zg=
github.com/minio/sha256-simd v0.1.1 h1:5QHSlgo3nt5yKOJrC7W8w7X+NFl8cMPZm96iu8kKUJU=
@ -676,6 +672,7 @@ github.com/stretchr/testify v1.7.4 h1:wZRexSlwd7ZXfKINDLsO4r7WBt3gTKONc6K/VesHvH
github.com/stretchr/testify v1.7.4/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/thoas/go-funk v0.9.1 h1:O549iLZqPpTUQ10ykd26sZhzD+rmR5pWhuElrhbC20M=
github.com/tklauser/go-sysconf v0.3.9 h1:JeUVdAOWhhxVcU6Eqr/ATFHgXk/mmiItdKeJPev3vTo=
github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs=
github.com/tklauser/numcpus v0.3.0 h1:ILuRUQBtssgnxw0XXIjKUC56fgnOrFoQQ/4+DeU2biQ=
@ -775,7 +772,6 @@ golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200709230013-948cd5f35899/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201216223049-8b5274cf687f/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg=
@ -795,7 +791,6 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20211216164055-b2b84827b756 h1:/5Bs7sWi0i3rOVO5KnM55OwugpsD4bRW1zywKoZjbkI=
golang.org/x/exp v0.0.0-20211216164055-b2b84827b756/go.mod h1:b9TAUYHmRtqA6klRHApnXMnj+OyLce4yF5cZCUbk2ps=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE=

View File

@ -21,16 +21,16 @@ type Catalog struct {
Txn kv.TxnKV
}
func buildIndexKey(collectionID, indexID int64) string {
func BuildIndexKey(collectionID, indexID int64) string {
return fmt.Sprintf("%s/%d/%d", util.FieldIndexPrefix, collectionID, indexID)
}
func buildSegmentIndexKey(collectionID, partitionID, segmentID, buildID int64) string {
func BuildSegmentIndexKey(collectionID, partitionID, segmentID, buildID int64) string {
return fmt.Sprintf("%s/%d/%d/%d/%d", util.SegmentIndexPrefix, collectionID, partitionID, segmentID, buildID)
}
func (kc *Catalog) CreateIndex(ctx context.Context, index *model.Index) error {
key := buildIndexKey(index.CollectionID, index.IndexID)
key := BuildIndexKey(index.CollectionID, index.IndexID)
value, err := proto.Marshal(model.MarshalIndexModel(index))
if err != nil {
@ -73,7 +73,7 @@ func (kc *Catalog) AlterIndex(ctx context.Context, index *model.Index) error {
func (kc *Catalog) AlterIndexes(ctx context.Context, indexes []*model.Index) error {
kvs := make(map[string]string)
for _, index := range indexes {
key := buildIndexKey(index.CollectionID, index.IndexID)
key := BuildIndexKey(index.CollectionID, index.IndexID)
value, err := proto.Marshal(model.MarshalIndexModel(index))
if err != nil {
@ -86,7 +86,7 @@ func (kc *Catalog) AlterIndexes(ctx context.Context, indexes []*model.Index) err
}
func (kc *Catalog) DropIndex(ctx context.Context, collID typeutil.UniqueID, dropIdxID typeutil.UniqueID) error {
key := buildIndexKey(collID, dropIdxID)
key := BuildIndexKey(collID, dropIdxID)
err := kc.Txn.Remove(key)
if err != nil {
@ -99,7 +99,7 @@ func (kc *Catalog) DropIndex(ctx context.Context, collID typeutil.UniqueID, drop
}
func (kc *Catalog) CreateSegmentIndex(ctx context.Context, segIdx *model.SegmentIndex) error {
key := buildSegmentIndexKey(segIdx.CollectionID, segIdx.PartitionID, segIdx.SegmentID, segIdx.BuildID)
key := BuildSegmentIndexKey(segIdx.CollectionID, segIdx.PartitionID, segIdx.SegmentID, segIdx.BuildID)
value, err := proto.Marshal(model.MarshalSegmentIndexModel(segIdx))
if err != nil {
@ -143,7 +143,7 @@ func (kc *Catalog) AlterSegmentIndex(ctx context.Context, segIdx *model.SegmentI
func (kc *Catalog) AlterSegmentIndexes(ctx context.Context, segIdxes []*model.SegmentIndex) error {
kvs := make(map[string]string)
for _, segIdx := range segIdxes {
key := buildSegmentIndexKey(segIdx.CollectionID, segIdx.PartitionID, segIdx.SegmentID, segIdx.BuildID)
key := BuildSegmentIndexKey(segIdx.CollectionID, segIdx.PartitionID, segIdx.SegmentID, segIdx.BuildID)
value, err := proto.Marshal(model.MarshalSegmentIndexModel(segIdx))
if err != nil {
return err
@ -154,7 +154,7 @@ func (kc *Catalog) AlterSegmentIndexes(ctx context.Context, segIdxes []*model.Se
}
func (kc *Catalog) DropSegmentIndex(ctx context.Context, collID, partID, segID, buildID typeutil.UniqueID) error {
key := buildSegmentIndexKey(collID, partID, segID, buildID)
key := BuildSegmentIndexKey(collID, partID, segID, buildID)
err := kc.Txn.Remove(key)
if err != nil {

View File

@ -40,27 +40,31 @@ type Catalog struct {
Snapshot kv.SnapShotKV
}
func buildCollectionKey(collectionID typeutil.UniqueID) string {
func BuildCollectionKey(collectionID typeutil.UniqueID) string {
return fmt.Sprintf("%s/%d", CollectionMetaPrefix, collectionID)
}
func buildPartitionPrefix(collectionID typeutil.UniqueID) string {
func BuildPartitionPrefix(collectionID typeutil.UniqueID) string {
return fmt.Sprintf("%s/%d", PartitionMetaPrefix, collectionID)
}
func buildPartitionKey(collectionID, partitionID typeutil.UniqueID) string {
return fmt.Sprintf("%s/%d", buildPartitionPrefix(collectionID), partitionID)
func BuildPartitionKey(collectionID, partitionID typeutil.UniqueID) string {
return fmt.Sprintf("%s/%d", BuildPartitionPrefix(collectionID), partitionID)
}
func buildFieldPrefix(collectionID typeutil.UniqueID) string {
func BuildFieldPrefix(collectionID typeutil.UniqueID) string {
return fmt.Sprintf("%s/%d", FieldMetaPrefix, collectionID)
}
func buildFieldKey(collectionID typeutil.UniqueID, fieldID int64) string {
return fmt.Sprintf("%s/%d", buildFieldPrefix(collectionID), fieldID)
func BuildFieldKey(collectionID typeutil.UniqueID, fieldID int64) string {
return fmt.Sprintf("%s/%d", BuildFieldPrefix(collectionID), fieldID)
}
func buildAliasKey(aliasName string) string {
func BuildAliasKey210(alias string) string {
return fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix210, alias)
}
func BuildAliasKey(aliasName string) string {
return fmt.Sprintf("%s/%s", AliasMetaPrefix, aliasName)
}
@ -83,7 +87,7 @@ func (kc *Catalog) CreateCollection(ctx context.Context, coll *model.Collection,
return fmt.Errorf("cannot create collection with state: %s, collection: %s", coll.State.String(), coll.Name)
}
k1 := buildCollectionKey(coll.CollectionID)
k1 := BuildCollectionKey(coll.CollectionID)
collInfo := model.MarshalCollectionModel(coll)
v1, err := proto.Marshal(collInfo)
if err != nil {
@ -104,7 +108,7 @@ func (kc *Catalog) CreateCollection(ctx context.Context, coll *model.Collection,
// save partition info to newly path.
for _, partition := range coll.Partitions {
k := buildPartitionKey(coll.CollectionID, partition.PartitionID)
k := BuildPartitionKey(coll.CollectionID, partition.PartitionID)
partitionInfo := model.MarshalPartitionModel(partition)
v, err := proto.Marshal(partitionInfo)
if err != nil {
@ -117,7 +121,7 @@ func (kc *Catalog) CreateCollection(ctx context.Context, coll *model.Collection,
// save fields info to newly path.
for _, field := range coll.Fields {
k := buildFieldKey(coll.CollectionID, field.FieldID)
k := BuildFieldKey(coll.CollectionID, field.FieldID)
fieldInfo := model.MarshalFieldModel(field)
v, err := proto.Marshal(fieldInfo)
if err != nil {
@ -134,7 +138,7 @@ func (kc *Catalog) CreateCollection(ctx context.Context, coll *model.Collection,
}
func (kc *Catalog) loadCollection(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*pb.CollectionInfo, error) {
collKey := buildCollectionKey(collectionID)
collKey := BuildCollectionKey(collectionID)
collVal, err := kc.Snapshot.Load(collKey, ts)
if err != nil {
return nil, common.NewCollectionNotExistError(fmt.Sprintf("collection not found: %d", collectionID))
@ -167,7 +171,7 @@ func (kc *Catalog) CreatePartition(ctx context.Context, partition *model.Partiti
if partitionVersionAfter210(collMeta) {
// save to newly path.
k := buildPartitionKey(partition.CollectionID, partition.PartitionID)
k := BuildPartitionKey(partition.CollectionID, partition.PartitionID)
partitionInfo := model.MarshalPartitionModel(partition)
v, err := proto.Marshal(partitionInfo)
if err != nil {
@ -189,7 +193,7 @@ func (kc *Catalog) CreatePartition(ctx context.Context, partition *model.Partiti
collMeta.PartitionNames = append(collMeta.PartitionNames, partition.PartitionName)
collMeta.PartitionCreatedTimestamps = append(collMeta.PartitionCreatedTimestamps, partition.PartitionCreatedTimestamp)
k := buildCollectionKey(partition.CollectionID)
k := BuildCollectionKey(partition.CollectionID)
v, err := proto.Marshal(collMeta)
if err != nil {
return err
@ -198,8 +202,8 @@ func (kc *Catalog) CreatePartition(ctx context.Context, partition *model.Partiti
}
func (kc *Catalog) CreateAlias(ctx context.Context, alias *model.Alias, ts typeutil.Timestamp) error {
oldKBefore210 := fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, alias.Name)
k := buildAliasKey(alias.Name)
oldKBefore210 := BuildAliasKey210(alias.Name)
k := BuildAliasKey(alias.Name)
aliasInfo := model.MarshalAliasModel(alias)
v, err := proto.Marshal(aliasInfo)
if err != nil {
@ -231,7 +235,7 @@ func (kc *Catalog) AlterCredential(ctx context.Context, credential *model.Creden
}
func (kc *Catalog) listPartitionsAfter210(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) ([]*model.Partition, error) {
prefix := buildPartitionPrefix(collectionID)
prefix := BuildPartitionPrefix(collectionID)
_, values, err := kc.Snapshot.LoadWithPrefix(prefix, ts)
if err != nil {
return nil, err
@ -253,7 +257,7 @@ func fieldVersionAfter210(collMeta *pb.CollectionInfo) bool {
}
func (kc *Catalog) listFieldsAfter210(ctx context.Context, collectionID typeutil.UniqueID, ts typeutil.Timestamp) ([]*model.Field, error) {
prefix := buildFieldPrefix(collectionID)
prefix := BuildFieldPrefix(collectionID)
_, values, err := kc.Snapshot.LoadWithPrefix(prefix, ts)
if err != nil {
return nil, err
@ -324,22 +328,22 @@ func (kc *Catalog) AlterAlias(ctx context.Context, alias *model.Alias, ts typeut
}
func (kc *Catalog) DropCollection(ctx context.Context, collectionInfo *model.Collection, ts typeutil.Timestamp) error {
collectionKey := buildCollectionKey(collectionInfo.CollectionID)
collectionKey := BuildCollectionKey(collectionInfo.CollectionID)
var delMetakeysSnap []string
for _, alias := range collectionInfo.Aliases {
delMetakeysSnap = append(delMetakeysSnap,
fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, alias),
BuildAliasKey210(alias),
)
}
// Snapshot will list all (k, v) pairs and then use Txn.MultiSave to save tombstone for these keys when it prepares
// to remove a prefix, so though we have very few prefixes, the final operations may exceed the max txn number.
// TODO(longjiquan): should we list all partitions & fields in KV anyway?
for _, partition := range collectionInfo.Partitions {
delMetakeysSnap = append(delMetakeysSnap, buildPartitionKey(collectionInfo.CollectionID, partition.PartitionID))
delMetakeysSnap = append(delMetakeysSnap, BuildPartitionKey(collectionInfo.CollectionID, partition.PartitionID))
}
for _, field := range collectionInfo.Fields {
delMetakeysSnap = append(delMetakeysSnap, buildFieldKey(collectionInfo.CollectionID, field.FieldID))
delMetakeysSnap = append(delMetakeysSnap, BuildFieldKey(collectionInfo.CollectionID, field.FieldID))
}
// delMetakeysSnap = append(delMetakeysSnap, buildPartitionPrefix(collectionInfo.CollectionID))
// delMetakeysSnap = append(delMetakeysSnap, buildFieldPrefix(collectionInfo.CollectionID))
@ -370,7 +374,7 @@ func (kc *Catalog) alterModifyCollection(oldColl *model.Collection, newColl *mod
oldCollClone.CreateTime = newColl.CreateTime
oldCollClone.ConsistencyLevel = newColl.ConsistencyLevel
oldCollClone.State = newColl.State
key := buildCollectionKey(oldColl.CollectionID)
key := BuildCollectionKey(oldColl.CollectionID)
value, err := proto.Marshal(model.MarshalCollectionModel(oldCollClone))
if err != nil {
return err
@ -394,7 +398,7 @@ func (kc *Catalog) alterModifyPartition(oldPart *model.Partition, newPart *model
oldPartClone.PartitionName = newPartClone.PartitionName
oldPartClone.PartitionCreatedTimestamp = newPartClone.PartitionCreatedTimestamp
oldPartClone.State = newPartClone.State
key := buildPartitionKey(oldPart.CollectionID, oldPart.PartitionID)
key := BuildPartitionKey(oldPart.CollectionID, oldPart.PartitionID)
value, err := proto.Marshal(model.MarshalPartitionModel(oldPartClone))
if err != nil {
return err
@ -437,11 +441,11 @@ func (kc *Catalog) DropPartition(ctx context.Context, collectionID typeutil.Uniq
}
if partitionVersionAfter210(collMeta) {
k := buildPartitionKey(collectionID, partitionID)
k := BuildPartitionKey(collectionID, partitionID)
return kc.Snapshot.MultiSaveAndRemoveWithPrefix(nil, []string{k}, ts)
}
k := buildCollectionKey(collectionID)
k := BuildCollectionKey(collectionID)
dropPartition(collMeta, partitionID)
v, err := proto.Marshal(collMeta)
if err != nil {
@ -462,8 +466,8 @@ func (kc *Catalog) DropCredential(ctx context.Context, username string) error {
}
func (kc *Catalog) DropAlias(ctx context.Context, alias string, ts typeutil.Timestamp) error {
oldKBefore210 := fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, alias)
k := buildAliasKey(alias)
oldKBefore210 := BuildAliasKey210(alias)
k := BuildAliasKey(alias)
return kc.Snapshot.MultiSaveAndRemoveWithPrefix(nil, []string{k, oldKBefore210}, ts)
}
@ -519,7 +523,7 @@ func (kc *Catalog) ListCollections(ctx context.Context, ts typeutil.Timestamp) (
}
func (kc *Catalog) listAliasesBefore210(ctx context.Context, ts typeutil.Timestamp) ([]*model.Alias, error) {
_, values, err := kc.Snapshot.LoadWithPrefix(CollectionAliasMetaPrefix, ts)
_, values, err := kc.Snapshot.LoadWithPrefix(CollectionAliasMetaPrefix210, ts)
if err != nil {
return nil, err
}

View File

@ -770,7 +770,7 @@ func TestCatalog_AlterCollection(t *testing.T) {
newC := &model.Collection{CollectionID: collectionID, State: pb.CollectionState_CollectionCreated}
err := kc.AlterCollection(ctx, oldC, newC, metastore.MODIFY, 0)
assert.NoError(t, err)
key := buildCollectionKey(collectionID)
key := BuildCollectionKey(collectionID)
value, ok := kvs[key]
assert.True(t, ok)
var collPb pb.CollectionInfo
@ -821,7 +821,7 @@ func TestCatalog_AlterPartition(t *testing.T) {
newP := &model.Partition{PartitionID: partitionID, CollectionID: collectionID, State: pb.PartitionState_PartitionCreated}
err := kc.AlterPartition(ctx, oldP, newP, metastore.MODIFY, 0)
assert.NoError(t, err)
key := buildPartitionKey(collectionID, partitionID)
key := BuildPartitionKey(collectionID, partitionID)
value, ok := kvs[key]
assert.True(t, ok)
var partPb pb.PartitionInfo

View File

@ -11,8 +11,11 @@ const (
AliasMetaPrefix = ComponentPrefix + "/aliases"
FieldMetaPrefix = ComponentPrefix + "/fields"
// CollectionAliasMetaPrefix prefix for collection alias meta
CollectionAliasMetaPrefix = ComponentPrefix + "/collection-alias"
// CollectionAliasMetaPrefix210 prefix for collection alias meta
CollectionAliasMetaPrefix210 = ComponentPrefix + "/collection-alias"
SnapshotsSep = "_ts"
SnapshotPrefix = "snapshots"
// CommonCredentialPrefix subpath for common credential
/* #nosec G101 */

View File

@ -27,6 +27,8 @@ import (
"strings"
"sync"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/retry"
@ -39,6 +41,16 @@ var (
SuffixSnapshotTombstone = []byte{0xE2, 0x9B, 0xBC}
)
// IsTombstone used in migration tool also.
func IsTombstone(value string) bool {
return bytes.Equal([]byte(value), SuffixSnapshotTombstone)
}
// ConstructTombstone used in migration tool also.
func ConstructTombstone() []byte {
return common.CloneByteSlice(SuffixSnapshotTombstone)
}
// SuffixSnapshot implements SnapshotKV
// this is a simple replacement for MetaSnapshot, which is not available due to etcd compaction
// SuffixSnapshot record timestamp as prefix of a key under the Snapshot prefix path
@ -106,7 +118,7 @@ func NewSuffixSnapshot(txnKV kv.TxnKV, sep, root, snapshot string) (*SuffixSnaps
// isTombstone helper function to check whether is tombstone mark
func (ss *SuffixSnapshot) isTombstone(value string) bool {
return bytes.Equal([]byte(value), SuffixSnapshotTombstone)
return IsTombstone(value)
}
// hideRootPrefix helper function to hide root prefix from key
@ -120,11 +132,17 @@ func (ss *SuffixSnapshot) composeSnapshotPrefix(key string) string {
return path.Join(ss.snapshotPrefix, key+ss.separator)
}
// ComposeSnapshotKey used in migration tool also, in case of any rules change.
func ComposeSnapshotKey(snapshotPrefix string, key string, separator string, ts typeutil.Timestamp) string {
// [key][sep][ts]
return path.Join(snapshotPrefix, fmt.Sprintf("%s%s%d", key, separator, ts))
}
// composeTSKey unified tsKey composing method
// uses key, ts and separator to form a key
func (ss *SuffixSnapshot) composeTSKey(key string, ts typeutil.Timestamp) string {
// [key][sep][ts]
return path.Join(ss.snapshotPrefix, fmt.Sprintf("%s%s%d", key, ss.separator, ts))
return ComposeSnapshotKey(ss.snapshotPrefix, key, ss.separator, ts)
}
// isTSKey checks whether a key is in ts-key format

View File

@ -77,14 +77,6 @@ func UnmarshalCollectionModel(coll *pb.CollectionInfo) *Collection {
}
}
filedIDToIndexIDs := make([]common.Int64Tuple, len(coll.FieldIndexes))
for idx, fieldIndexInfo := range coll.FieldIndexes {
filedIDToIndexIDs[idx] = common.Int64Tuple{
Key: fieldIndexInfo.FiledID,
Value: fieldIndexInfo.IndexID,
}
}
return &Collection{
CollectionID: coll.ID,
Name: coll.Schema.Name,
@ -106,6 +98,33 @@ func UnmarshalCollectionModel(coll *pb.CollectionInfo) *Collection {
// MarshalCollectionModel marshal only collection-related information.
// partitions, aliases and fields won't be marshaled. They should be written to newly path.
func MarshalCollectionModel(coll *Collection) *pb.CollectionInfo {
return marshalCollectionModelWithConfig(coll, newDefaultConfig())
}
type config struct {
withFields bool
withPartitions bool
}
type Option func(c *config)
func newDefaultConfig() *config {
return &config{withFields: false, withPartitions: false}
}
func WithFields() Option {
return func(c *config) {
c.withFields = true
}
}
func WithPartitions() Option {
return func(c *config) {
c.withPartitions = true
}
}
func marshalCollectionModelWithConfig(coll *Collection, c *config) *pb.CollectionInfo {
if coll == nil {
return nil
}
@ -116,16 +135,12 @@ func MarshalCollectionModel(coll *Collection) *pb.CollectionInfo {
AutoID: coll.AutoID,
}
partitions := make([]*pb.PartitionInfo, len(coll.Partitions))
for idx, partition := range coll.Partitions {
partitions[idx] = &pb.PartitionInfo{
PartitionID: partition.PartitionID,
PartitionName: partition.PartitionName,
PartitionCreatedTimestamp: partition.PartitionCreatedTimestamp,
}
if c.withFields {
fields := MarshalFieldModels(coll.Fields)
collSchema.Fields = fields
}
return &pb.CollectionInfo{
collectionPb := &pb.CollectionInfo{
ID: coll.CollectionID,
Schema: collSchema,
CreateTime: coll.CreateTime,
@ -137,4 +152,22 @@ func MarshalCollectionModel(coll *Collection) *pb.CollectionInfo {
State: coll.State,
Properties: coll.Properties,
}
if c.withPartitions {
for _, partition := range coll.Partitions {
collectionPb.PartitionNames = append(collectionPb.PartitionNames, partition.PartitionName)
collectionPb.PartitionIDs = append(collectionPb.PartitionIDs, partition.PartitionID)
collectionPb.PartitionCreatedTimestamps = append(collectionPb.PartitionCreatedTimestamps, partition.PartitionCreatedTimestamp)
}
}
return collectionPb
}
func MarshalCollectionModelWithOption(coll *Collection, opts ...Option) *pb.CollectionInfo {
c := newDefaultConfig()
for _, opt := range opts {
opt(c)
}
return marshalCollectionModelWithConfig(coll, c)
}

View File

@ -2,8 +2,6 @@ package rootcoord
const (
// TODO: better to make them configurable, use default value if no config was set since we never explode these before.
snapshotsSep = "_ts"
snapshotPrefix = "snapshots"
globalIDAllocatorKey = "idTimestamp"
globalIDAllocatorSubPath = "gid"
globalTSOAllocatorKey = "timestamp"

View File

@ -43,12 +43,6 @@ const (
// TimestampPrefix prefix for timestamp
TimestampPrefix = rootcoord.ComponentPrefix + "/timestamp"
// DDOperationPrefix prefix for DD operation
DDOperationPrefix = rootcoord.ComponentPrefix + "/dd-operation"
// DDMsgSendPrefix prefix to indicate whether DD msg has been send
DDMsgSendPrefix = rootcoord.ComponentPrefix + "/dd-msg-send"
// CreateCollectionDDType name of DD type for create collection
CreateCollectionDDType = "CreateCollection"

View File

@ -350,7 +350,7 @@ func (c *Core) initMetaTable() error {
return err
}
if ss, err = kvmetestore.NewSuffixSnapshot(metaKV, snapshotsSep, Params.EtcdCfg.MetaRootPath, snapshotPrefix); err != nil {
if ss, err = kvmetestore.NewSuffixSnapshot(metaKV, kvmetestore.SnapshotsSep, Params.EtcdCfg.MetaRootPath, kvmetestore.SnapshotPrefix); err != nil {
return err
}

View File

@ -82,6 +82,14 @@ type BaseTable struct {
YamlFile string
}
// NewBaseTableFromYamlOnly only used in migration tool.
// Maybe we shouldn't limit the configDir internally.
func NewBaseTableFromYamlOnly(yaml string) *BaseTable {
mgr, _ := config.Init(config.WithFilesSource(yaml))
gp := &BaseTable{mgr: mgr, YamlFile: yaml}
return gp
}
// GlobalInitWithYaml initializes the param table with the given yaml.
// We will update the global DefaultYaml variable directly, once and for all.
// GlobalInitWithYaml shall be called at the very beginning before initiating the base table.

View File

@ -283,3 +283,11 @@ func Test_SetLogger(t *testing.T) {
assert.Equal(t, true, grpclog.V(2))
})
}
func TestNewBaseTableFromYamlOnly(t *testing.T) {
var yaml string
var gp *BaseTable
yaml = "not_exist.yaml"
gp = NewBaseTableFromYamlOnly(yaml)
assert.Empty(t, gp.Get("key"))
}

View File

@ -219,6 +219,10 @@ type MetaStoreConfig struct {
func (p *MetaStoreConfig) init(base *BaseTable) {
p.Base = base
p.LoadCfgToMemory()
}
func (p *MetaStoreConfig) LoadCfgToMemory() {
p.initMetaStoreType()
}

View File

@ -33,6 +33,17 @@ var GlobalParams paramtable.ComponentParam
// SessionEventType session event type
type SessionEventType int
func (t SessionEventType) String() string {
switch t {
case SessionAddEvent:
return "SessionAddEvent"
case SessionDelEvent:
return "SessionDelEvent"
default:
return ""
}
}
// Rewatch defines the behavior outer session watch handles ErrCompacted
// it should process the current full list of session
// and returns err if meta error or anything else goes wrong

View File

@ -685,3 +685,20 @@ func TestSessionProcessActiveStandBy(t *testing.T) {
wg.Wait()
assert.False(t, s2.isStandby.Load().(bool))
}
func TestSessionEventType_String(t *testing.T) {
tests := []struct {
name string
t SessionEventType
want string
}{
{t: SessionNoneEvent, want: ""},
{t: SessionAddEvent, want: "SessionAddEvent"},
{t: SessionDelEvent, want: "SessionDelEvent"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.want, tt.t.String(), "String()")
})
}
}

View File

@ -52,6 +52,8 @@ mkdir -p datapb
mkdir -p querypb
mkdir -p planpb
mkdir -p ../../cmd/tools/migration/legacy/legacypb
${protoc} --proto_path="${GOOGLE_PROTO_DIR}" --proto_path=. \
--go_opt="Mmilvus.proto=github.com/milvus-io/milvus/api/milvuspb;milvuspb" \
--go_opt=Mcommon.proto=github.com/milvus-io/milvus/api/commonpb \
@ -86,4 +88,7 @@ ${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./querypb query_coord.
${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./planpb plan.proto
${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./segcorepb segcore.proto
${protoc_opt} --proto_path=../../cmd/tools/migration/legacy/ \
--go_out=plugins=grpc,paths=source_relative:../../cmd/tools/migration/legacy/legacypb legacy.proto
popd