mirror of https://github.com/milvus-io/milvus.git
[skip e2e] Fix empty index build meta migration (#20206)
Signed-off-by: longjiquan <jiquan.long@zilliz.com> Signed-off-by: longjiquan <jiquan.long@zilliz.com>pull/20211/head
parent
0159480378
commit
e9cd2cb42a
|
@ -0,0 +1,7 @@
|
|||
package allocator
|
||||
|
||||
import "github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
|
||||
type Allocator interface {
|
||||
AllocID() (typeutil.UniqueID, error)
|
||||
}
|
|
@ -0,0 +1,27 @@
|
|||
package allocator
|
||||
|
||||
import "sort"
|
||||
|
||||
func makeClone(s []int64) []int64 {
|
||||
clone := make([]int64, len(s))
|
||||
copy(clone, s)
|
||||
return clone
|
||||
}
|
||||
|
||||
func NewAllocatorFromList(s []int64, useClone, deAsc bool) *AtomicAllocator {
|
||||
initialized, delta := int64(defaultInitializedValue), int64(defaultDelta)
|
||||
clone := s
|
||||
if useClone {
|
||||
clone = makeClone(s)
|
||||
}
|
||||
sort.Slice(clone, func(i, j int) bool { return clone[i] < clone[j] })
|
||||
l := len(clone)
|
||||
if l == 0 {
|
||||
// no change
|
||||
} else if deAsc {
|
||||
initialized, delta = clone[0], -1
|
||||
} else {
|
||||
initialized, delta = clone[l-1], 1
|
||||
}
|
||||
return NewAllocator(WithInitializedValue(initialized), WithDelta(delta))
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
package allocator
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestAllocatorFromList(t *testing.T) {
|
||||
t.Run("de asc", func(t *testing.T) {
|
||||
s := []int64{100000, 10000, 1000}
|
||||
alloc := NewAllocatorFromList(s, true, true)
|
||||
n := 100
|
||||
wg := &sync.WaitGroup{}
|
||||
for i := 0; i < n; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
id, err := alloc.AllocID()
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, id < 1000)
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
assert.Equal(t, int64(-1), alloc.delta)
|
||||
assert.Equal(t, int64(1000-n), alloc.now.Load())
|
||||
})
|
||||
}
|
|
@ -0,0 +1,50 @@
|
|||
package allocator
|
||||
|
||||
import (
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"go.uber.org/atomic"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultInitializedValue = 0
|
||||
defaultDelta = 1
|
||||
)
|
||||
|
||||
type AtomicAllocator struct {
|
||||
now atomic.Int64
|
||||
delta int64
|
||||
}
|
||||
|
||||
type Option func(allocator *AtomicAllocator)
|
||||
|
||||
func WithInitializedValue(value int64) Option {
|
||||
return func(allocator *AtomicAllocator) {
|
||||
allocator.now.Store(value)
|
||||
}
|
||||
}
|
||||
|
||||
func WithDelta(delta int64) Option {
|
||||
return func(allocator *AtomicAllocator) {
|
||||
allocator.delta = delta
|
||||
}
|
||||
}
|
||||
|
||||
func (alloc *AtomicAllocator) apply(opts ...Option) {
|
||||
for _, opt := range opts {
|
||||
opt(alloc)
|
||||
}
|
||||
}
|
||||
|
||||
func (alloc *AtomicAllocator) AllocID() (typeutil.UniqueID, error) {
|
||||
id := alloc.now.Add(alloc.delta)
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func NewAllocator(opts ...Option) *AtomicAllocator {
|
||||
alloc := &AtomicAllocator{
|
||||
now: *atomic.NewInt64(defaultInitializedValue),
|
||||
delta: defaultDelta,
|
||||
}
|
||||
alloc.apply(opts...)
|
||||
return alloc
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
package allocator
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestAtomicAllocator_AllocID(t *testing.T) {
|
||||
n := 100
|
||||
alloc := NewAllocator()
|
||||
wg := &sync.WaitGroup{}
|
||||
for i := 0; i < n; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
_, err := alloc.AllocID()
|
||||
assert.NoError(t, err)
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
assert.Equal(t, int64(defaultDelta), alloc.delta)
|
||||
assert.Equal(t, int64(defaultInitializedValue+n*defaultDelta), alloc.now.Load())
|
||||
}
|
|
@ -6,6 +6,10 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/milvus-io/milvus/cmd/tools/migration/legacy/legacypb"
|
||||
|
||||
"github.com/milvus-io/milvus/cmd/tools/migration/allocator"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
||||
"github.com/milvus-io/milvus/cmd/tools/migration/versions"
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
|
@ -195,14 +199,45 @@ func combineToCollectionIndexesMeta220(fieldIndexes FieldIndexes210, collectionI
|
|||
return indexes, nil
|
||||
}
|
||||
|
||||
func getOrFillBuildMeta(record *pb.SegmentIndexInfo, indexBuildMeta IndexBuildMeta210, alloc allocator.Allocator) (*legacypb.IndexMeta, error) {
|
||||
if record.GetBuildID() == 0 && !record.GetEnableIndex() {
|
||||
buildID, err := alloc.AllocID()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
buildMeta := &legacypb.IndexMeta{
|
||||
IndexBuildID: buildID,
|
||||
State: commonpb.IndexState_Finished,
|
||||
FailReason: "",
|
||||
Req: nil,
|
||||
IndexFilePaths: nil,
|
||||
MarkDeleted: false,
|
||||
NodeID: 0,
|
||||
IndexVersion: 1, // TODO: maybe a constraint is better.
|
||||
Recycled: false,
|
||||
SerializeSize: 0,
|
||||
}
|
||||
indexBuildMeta[buildID] = buildMeta
|
||||
return buildMeta, nil
|
||||
}
|
||||
buildMeta, ok := indexBuildMeta[record.GetBuildID()]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("index build meta not found, segment id: %d, index id: %d, index build id: %d",
|
||||
record.GetSegmentID(), record.GetIndexID(), record.GetBuildID())
|
||||
}
|
||||
return buildMeta, nil
|
||||
}
|
||||
|
||||
func combineToSegmentIndexesMeta220(segmentIndexes SegmentIndexesMeta210, indexBuildMeta IndexBuildMeta210) (SegmentIndexesMeta220, error) {
|
||||
alloc := allocator.NewAllocatorFromList(indexBuildMeta.GetAllBuildIDs(), false, true)
|
||||
|
||||
segmentIndexModels := make(SegmentIndexesMeta220)
|
||||
for segID := range segmentIndexes {
|
||||
for indexID := range segmentIndexes[segID] {
|
||||
record := segmentIndexes[segID][indexID]
|
||||
buildMeta, ok := indexBuildMeta[record.GetBuildID()]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("index build meta not found, segment id: %d, index id: %d, index build id: %d", segID, indexID, record.GetBuildID())
|
||||
buildMeta, err := getOrFillBuildMeta(record, indexBuildMeta, alloc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fileKeys := make([]string, len(buildMeta.GetIndexFilePaths()))
|
||||
|
|
|
@ -298,6 +298,14 @@ func (meta *IndexBuildMeta210) GenerateSaves() map[string]string {
|
|||
return kvs
|
||||
}
|
||||
|
||||
func (meta *IndexBuildMeta210) GetAllBuildIDs() []UniqueID {
|
||||
ret := make([]UniqueID, 0, len(*meta))
|
||||
for buildID := range *meta {
|
||||
ret = append(ret, buildID)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func (meta *FieldIndexes210) AddRecord(collectionID UniqueID, fieldIndexes []*pb.FieldIndexInfo, schema *schemapb.CollectionSchema) {
|
||||
(*meta)[collectionID] = &FieldIndexesWithSchema{indexes: fieldIndexes, schema: schema}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue