mirror of https://github.com/milvus-io/milvus.git
298 lines
8.8 KiB
Go
298 lines
8.8 KiB
Go
package meta
|
|
|
|
import (
|
|
"github.com/blang/semver/v4"
|
|
"github.com/golang/protobuf/proto"
|
|
"github.com/milvus-io/milvus/cmd/tools/migration/versions"
|
|
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
|
|
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
|
|
"github.com/milvus-io/milvus/internal/metastore/kv/rootcoord"
|
|
"github.com/milvus-io/milvus/internal/metastore/model"
|
|
"github.com/milvus-io/milvus/internal/proto/querypb"
|
|
)
|
|
|
|
type TtCollectionsMeta220 map[UniqueID]map[Timestamp]*model.Collection // coll_id -> ts -> coll
|
|
type CollectionsMeta220 map[UniqueID]*model.Collection // coll_id -> coll
|
|
|
|
type TtAliasesMeta220 map[string]map[Timestamp]*model.Alias // alias name -> ts -> coll
|
|
type AliasesMeta220 map[string]*model.Alias // alias name -> coll
|
|
|
|
type TtPartitionsMeta220 map[UniqueID]map[Timestamp][]*model.Partition // coll_id -> ts -> partitions
|
|
type PartitionsMeta220 map[UniqueID][]*model.Partition // coll_id -> ts -> partitions
|
|
|
|
type TtFieldsMeta220 map[UniqueID]map[Timestamp][]*model.Field // coll_id -> ts -> fields
|
|
type FieldsMeta220 map[UniqueID][]*model.Field // coll_id -> ts -> fields
|
|
|
|
type CollectionIndexesMeta220 map[UniqueID]map[UniqueID]*model.Index // coll_id -> index_id -> index
|
|
type SegmentIndexesMeta220 map[UniqueID]map[UniqueID]*model.SegmentIndex // seg_id -> index_id -> segment index
|
|
|
|
type CollectionLoadInfo220 map[UniqueID]*model.CollectionLoadInfo // collectionID -> CollectionLoadInfo
|
|
type PartitionLoadInfo220 map[UniqueID]map[UniqueID]*model.PartitionLoadInfo // collectionID, partitionID -> PartitionLoadInfo
|
|
|
|
func (meta *TtCollectionsMeta220) GenerateSaves(sourceVersion semver.Version) (map[string]string, error) {
|
|
saves := make(map[string]string)
|
|
|
|
opts := make([]model.Option, 0)
|
|
if sourceVersion.LT(versions.Version220) {
|
|
opts = append(opts, model.WithFields())
|
|
opts = append(opts, model.WithPartitions())
|
|
}
|
|
|
|
for collectionID := range *meta {
|
|
for ts := range (*meta)[collectionID] {
|
|
ckey := rootcoord.BuildCollectionKey(collectionID)
|
|
key := rootcoord.ComposeSnapshotKey(rootcoord.SnapshotPrefix, ckey, rootcoord.SnapshotsSep, ts)
|
|
collection := (*meta)[collectionID][ts]
|
|
var value string
|
|
if collection == nil {
|
|
// save a tombstone.
|
|
value = string(rootcoord.ConstructTombstone())
|
|
} else {
|
|
collectionPb := model.MarshalCollectionModelWithOption(collection, opts...)
|
|
marshaledCollectionPb, err := proto.Marshal(collectionPb)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
value = string(marshaledCollectionPb)
|
|
}
|
|
saves[key] = value
|
|
}
|
|
}
|
|
|
|
return saves, nil
|
|
}
|
|
|
|
func (meta *TtCollectionsMeta220) AddCollection(collectionID UniqueID, coll *model.Collection, ts Timestamp) {
|
|
_, collExist := (*meta)[collectionID]
|
|
if collExist {
|
|
(*meta)[collectionID][ts] = coll
|
|
} else {
|
|
(*meta)[collectionID] = map[Timestamp]*model.Collection{
|
|
ts: coll,
|
|
}
|
|
}
|
|
}
|
|
|
|
func (meta *CollectionsMeta220) AddCollection(collectionID UniqueID, coll *model.Collection) {
|
|
(*meta)[collectionID] = coll
|
|
}
|
|
|
|
func (meta *CollectionsMeta220) GenerateSaves(sourceVersion semver.Version) (map[string]string, error) {
|
|
saves := make(map[string]string)
|
|
|
|
opts := make([]model.Option, 0)
|
|
if sourceVersion.LT(versions.Version220) {
|
|
opts = append(opts, model.WithFields())
|
|
opts = append(opts, model.WithPartitions())
|
|
}
|
|
|
|
for collectionID := range *meta {
|
|
ckey := rootcoord.BuildCollectionKey(collectionID)
|
|
collection := (*meta)[collectionID]
|
|
var value string
|
|
if collection == nil {
|
|
// save a tombstone.
|
|
value = string(rootcoord.ConstructTombstone())
|
|
} else {
|
|
collectionPb := model.MarshalCollectionModelWithOption(collection, opts...)
|
|
marshaledCollectionPb, err := proto.Marshal(collectionPb)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
value = string(marshaledCollectionPb)
|
|
}
|
|
saves[ckey] = value
|
|
}
|
|
|
|
return saves, nil
|
|
}
|
|
|
|
func (meta *TtAliasesMeta220) AddAlias(alias string, aliasInfo *model.Alias, ts Timestamp) {
|
|
_, aliasExist := (*meta)[alias]
|
|
if aliasExist {
|
|
(*meta)[alias][ts] = aliasInfo
|
|
} else {
|
|
(*meta)[alias] = map[Timestamp]*model.Alias{
|
|
ts: aliasInfo,
|
|
}
|
|
}
|
|
}
|
|
|
|
func (meta *TtAliasesMeta220) GenerateSaves() (map[string]string, error) {
|
|
saves := make(map[string]string)
|
|
|
|
for alias := range *meta {
|
|
for ts := range (*meta)[alias] {
|
|
ckey := rootcoord.BuildAliasKey(alias)
|
|
key := rootcoord.ComposeSnapshotKey(rootcoord.SnapshotPrefix, ckey, rootcoord.SnapshotsSep, ts)
|
|
aliasInfo := (*meta)[alias][ts]
|
|
var value string
|
|
if aliasInfo == nil {
|
|
// save a tombstone.
|
|
value = string(rootcoord.ConstructTombstone())
|
|
} else {
|
|
aliasPb := model.MarshalAliasModel(aliasInfo)
|
|
marshaledAliasPb, err := proto.Marshal(aliasPb)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
value = string(marshaledAliasPb)
|
|
}
|
|
saves[key] = value
|
|
}
|
|
}
|
|
|
|
return saves, nil
|
|
}
|
|
|
|
func (meta *AliasesMeta220) AddAlias(alias string, aliasInfo *model.Alias) {
|
|
(*meta)[alias] = aliasInfo
|
|
}
|
|
|
|
func (meta *AliasesMeta220) GenerateSaves() (map[string]string, error) {
|
|
saves := make(map[string]string)
|
|
|
|
for alias := range *meta {
|
|
ckey := rootcoord.BuildAliasKey(alias)
|
|
aliasInfo := (*meta)[alias]
|
|
var value string
|
|
if aliasInfo == nil {
|
|
// save a tombstone.
|
|
value = string(rootcoord.ConstructTombstone())
|
|
} else {
|
|
aliasPb := model.MarshalAliasModel(aliasInfo)
|
|
marshaledAliasPb, err := proto.Marshal(aliasPb)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
value = string(marshaledAliasPb)
|
|
}
|
|
saves[ckey] = value
|
|
}
|
|
|
|
return saves, nil
|
|
}
|
|
|
|
func (meta *CollectionIndexesMeta220) AddRecord(collID UniqueID, indexID int64, record *model.Index) {
|
|
if _, collExist := (*meta)[collID]; !collExist {
|
|
(*meta)[collID] = map[UniqueID]*model.Index{
|
|
indexID: record,
|
|
}
|
|
} else {
|
|
(*meta)[collID][indexID] = record
|
|
}
|
|
}
|
|
|
|
func (meta *CollectionIndexesMeta220) GenerateSaves() (map[string]string, error) {
|
|
saves := make(map[string]string)
|
|
|
|
for collectionID := range *meta {
|
|
for indexID := range (*meta)[collectionID] {
|
|
ckey := datacoord.BuildIndexKey(collectionID, indexID)
|
|
index := (*meta)[collectionID][indexID]
|
|
var value string
|
|
indexPb := model.MarshalIndexModel(index)
|
|
marshaledIndexPb, err := proto.Marshal(indexPb)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
value = string(marshaledIndexPb)
|
|
saves[ckey] = value
|
|
}
|
|
}
|
|
|
|
return saves, nil
|
|
}
|
|
|
|
func (meta *SegmentIndexesMeta220) GenerateSaves() (map[string]string, error) {
|
|
saves := make(map[string]string)
|
|
|
|
for segmentID := range *meta {
|
|
for indexID := range (*meta)[segmentID] {
|
|
index := (*meta)[segmentID][indexID]
|
|
ckey := datacoord.BuildSegmentIndexKey(index.CollectionID, index.PartitionID, index.SegmentID, index.BuildID)
|
|
var value string
|
|
indexPb := model.MarshalSegmentIndexModel(index)
|
|
marshaledIndexPb, err := proto.Marshal(indexPb)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
value = string(marshaledIndexPb)
|
|
saves[ckey] = value
|
|
}
|
|
}
|
|
|
|
return saves, nil
|
|
}
|
|
|
|
func (meta *SegmentIndexesMeta220) AddRecord(segID UniqueID, indexID UniqueID, record *model.SegmentIndex) {
|
|
if _, segExist := (*meta)[segID]; !segExist {
|
|
(*meta)[segID] = map[UniqueID]*model.SegmentIndex{
|
|
indexID: record,
|
|
}
|
|
} else {
|
|
(*meta)[segID][indexID] = record
|
|
}
|
|
}
|
|
|
|
func (meta *CollectionLoadInfo220) GenerateSaves() (map[string]string, error) {
|
|
saves := make(map[string]string)
|
|
for _, loadInfo := range *meta {
|
|
k := querycoord.EncodeCollectionLoadInfoKey(loadInfo.CollectionID)
|
|
v, err := proto.Marshal(&querypb.CollectionLoadInfo{
|
|
CollectionID: loadInfo.CollectionID,
|
|
ReleasedPartitions: loadInfo.ReleasedPartitionIDs,
|
|
ReplicaNumber: loadInfo.ReplicaNumber,
|
|
Status: loadInfo.Status,
|
|
FieldIndexID: loadInfo.FieldIndexID,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
saves[k] = string(v)
|
|
}
|
|
return saves, nil
|
|
}
|
|
|
|
func (meta *PartitionLoadInfo220) GenerateSaves() (map[string]string, error) {
|
|
saves := make(map[string]string)
|
|
for _, partitions := range *meta {
|
|
for _, loadInfo := range partitions {
|
|
k := querycoord.EncodePartitionLoadInfoKey(loadInfo.CollectionID, loadInfo.PartitionID)
|
|
v, err := proto.Marshal(&querypb.PartitionLoadInfo{
|
|
CollectionID: loadInfo.CollectionID,
|
|
PartitionID: loadInfo.PartitionID,
|
|
ReplicaNumber: loadInfo.ReplicaNumber,
|
|
Status: loadInfo.Status,
|
|
FieldIndexID: loadInfo.FieldIndexID,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
saves[k] = string(v)
|
|
}
|
|
}
|
|
return saves, nil
|
|
}
|
|
|
|
type All220 struct {
|
|
TtCollections TtCollectionsMeta220
|
|
Collections CollectionsMeta220
|
|
|
|
TtAliases TtAliasesMeta220
|
|
Aliases AliasesMeta220
|
|
|
|
TtPartitions TtPartitionsMeta220
|
|
Partitions PartitionsMeta220
|
|
|
|
TtFields TtFieldsMeta220
|
|
Fields FieldsMeta220
|
|
|
|
CollectionIndexes CollectionIndexesMeta220
|
|
SegmentIndexes SegmentIndexesMeta220
|
|
|
|
// QueryCoord Meta
|
|
CollectionLoadInfos CollectionLoadInfo220
|
|
PartitionLoadInfos PartitionLoadInfo220
|
|
}
|