mirror of https://github.com/milvus-io/milvus.git
168 lines
5.4 KiB
Go
168 lines
5.4 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package datacoord
|
|
|
|
import (
|
|
"context"
|
|
"math/rand"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/mock"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
|
"github.com/milvus-io/milvus/internal/datacoord/allocator"
|
|
"github.com/milvus-io/milvus/internal/datacoord/task"
|
|
mocks2 "github.com/milvus-io/milvus/internal/metastore/mocks"
|
|
"github.com/milvus-io/milvus/internal/metastore/model"
|
|
"github.com/milvus-io/milvus/internal/mocks"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/workerpb"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/lock"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
|
)
|
|
|
|
func TestIndexInspector_inspect(t *testing.T) {
|
|
t.Run("normal test", func(t *testing.T) {
|
|
ctx := context.Background()
|
|
notifyChan := make(chan int64, 1)
|
|
scheduler := task.NewMockGlobalScheduler(t)
|
|
alloc := allocator.NewMockAllocator(t)
|
|
handler := NewNMockHandler(t)
|
|
storage := mocks.NewChunkManager(t)
|
|
versionManager := newIndexEngineVersionManager()
|
|
catalog := mocks2.NewDataCoordCatalog(t)
|
|
|
|
meta := &meta{
|
|
segments: NewSegmentsInfo(),
|
|
collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo](),
|
|
indexMeta: &indexMeta{
|
|
keyLock: lock.NewKeyLock[UniqueID](),
|
|
catalog: catalog,
|
|
segmentBuildInfo: newSegmentIndexBuildInfo(),
|
|
indexes: make(map[UniqueID]map[UniqueID]*model.Index),
|
|
segmentIndexes: typeutil.NewConcurrentMap[UniqueID, *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]](),
|
|
},
|
|
}
|
|
|
|
segment := &SegmentInfo{
|
|
SegmentInfo: &datapb.SegmentInfo{
|
|
ID: 1,
|
|
CollectionID: 2,
|
|
PartitionID: 3,
|
|
NumOfRows: 3000,
|
|
State: commonpb.SegmentState_Flushed,
|
|
IsSorted: true,
|
|
},
|
|
}
|
|
meta.segments.SetSegment(segment.GetID(), segment)
|
|
|
|
meta.indexMeta.indexes[2] = map[UniqueID]*model.Index{
|
|
5: {
|
|
CollectionID: 2,
|
|
FieldID: 101,
|
|
IndexID: 5,
|
|
IndexName: indexName,
|
|
},
|
|
}
|
|
|
|
inspector := newIndexInspector(ctx, notifyChan, meta, scheduler, alloc, handler, storage, versionManager)
|
|
|
|
inspector.Start()
|
|
defer inspector.Stop()
|
|
|
|
notifyChan <- segment.GetCollectionID()
|
|
|
|
alloc.EXPECT().AllocID(mock.Anything).Return(rand.Int63(), nil)
|
|
catalog.EXPECT().CreateSegmentIndex(mock.Anything, mock.Anything).Return(nil)
|
|
catalog.EXPECT().AlterSegmentIndexes(mock.Anything, mock.Anything).Return(nil)
|
|
scheduler.EXPECT().Enqueue(mock.Anything).Run(func(_a0 task.Task) {
|
|
err := meta.indexMeta.AddSegmentIndex(context.TODO(), &model.SegmentIndex{
|
|
SegmentID: segment.GetID(),
|
|
BuildID: segment.GetID(),
|
|
})
|
|
assert.NoError(t, err)
|
|
err = meta.indexMeta.FinishTask(&workerpb.IndexTaskInfo{
|
|
BuildID: segment.GetID(),
|
|
State: commonpb.IndexState_Finished,
|
|
})
|
|
assert.NoError(t, err)
|
|
})
|
|
|
|
assert.Eventually(t, func() bool {
|
|
return !meta.indexMeta.IsUnIndexedSegment(segment.GetCollectionID(), segment.GetID())
|
|
}, time.Second*10, time.Millisecond*10)
|
|
})
|
|
}
|
|
|
|
func TestIndexInspector_ReloadFromMeta(t *testing.T) {
|
|
ctx := context.Background()
|
|
notifyChan := make(chan int64, 1)
|
|
scheduler := task.NewMockGlobalScheduler(t)
|
|
alloc := allocator.NewMockAllocator(t)
|
|
handler := NewNMockHandler(t)
|
|
storage := mocks.NewChunkManager(t)
|
|
versionManager := newIndexEngineVersionManager()
|
|
catalog := mocks2.NewDataCoordCatalog(t)
|
|
|
|
meta := &meta{
|
|
segments: NewSegmentsInfo(),
|
|
collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo](),
|
|
indexMeta: &indexMeta{
|
|
keyLock: lock.NewKeyLock[UniqueID](),
|
|
catalog: catalog,
|
|
segmentBuildInfo: newSegmentIndexBuildInfo(),
|
|
indexes: make(map[UniqueID]map[UniqueID]*model.Index),
|
|
segmentIndexes: typeutil.NewConcurrentMap[UniqueID, *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]](),
|
|
},
|
|
}
|
|
|
|
inspector := newIndexInspector(ctx, notifyChan, meta, scheduler, alloc, handler, storage, versionManager)
|
|
|
|
catalog.EXPECT().CreateSegmentIndex(mock.Anything, mock.Anything).Return(nil)
|
|
|
|
seg1 := &SegmentInfo{
|
|
SegmentInfo: &datapb.SegmentInfo{
|
|
ID: 1,
|
|
CollectionID: 2,
|
|
State: commonpb.SegmentState_Flushed,
|
|
},
|
|
}
|
|
meta.segments.SetSegment(seg1.ID, seg1)
|
|
|
|
segIndex1 := &model.SegmentIndex{
|
|
SegmentID: seg1.ID,
|
|
IndexID: 3,
|
|
BuildID: 4,
|
|
IndexState: commonpb.IndexState_Unissued,
|
|
}
|
|
meta.indexMeta.AddSegmentIndex(ctx, segIndex1)
|
|
|
|
meta.indexMeta.indexes[2] = map[UniqueID]*model.Index{
|
|
3: {
|
|
CollectionID: 2,
|
|
FieldID: 100,
|
|
IndexID: 3,
|
|
IndexName: indexName,
|
|
},
|
|
}
|
|
|
|
scheduler.EXPECT().Enqueue(mock.Anything).Return()
|
|
inspector.reloadFromMeta()
|
|
}
|