mirror of https://github.com/milvus-io/milvus.git
384 lines
11 KiB
Go
384 lines
11 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package datacoord
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/cockroachdb/errors"
|
|
"github.com/stretchr/testify/mock"
|
|
"github.com/stretchr/testify/suite"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
|
"github.com/milvus-io/milvus/internal/datacoord/allocator"
|
|
"github.com/milvus-io/milvus/internal/datacoord/session"
|
|
"github.com/milvus-io/milvus/internal/datacoord/task"
|
|
"github.com/milvus-io/milvus/internal/metastore/mocks"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/lock"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
|
)
|
|
|
|
type statsInspectorSuite struct {
|
|
suite.Suite
|
|
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
mt *meta
|
|
alloc *allocator.MockAllocator
|
|
catalog *mocks.DataCoordCatalog
|
|
cluster session.Cluster
|
|
scheduler task.GlobalScheduler
|
|
inspector *statsInspector
|
|
}
|
|
|
|
func Test_statsInspectorSuite(t *testing.T) {
|
|
suite.Run(t, new(statsInspectorSuite))
|
|
}
|
|
|
|
func (s *statsInspectorSuite) SetupTest() {
|
|
s.ctx, s.cancel = context.WithCancel(context.Background())
|
|
|
|
Params.Save(Params.DataCoordCfg.TaskCheckInterval.Key, "1")
|
|
Params.Save(Params.DataCoordCfg.GCInterval.Key, "1")
|
|
|
|
s.alloc = allocator.NewMockAllocator(s.T())
|
|
var idCounter int64 = 1000
|
|
s.alloc.EXPECT().AllocID(mock.Anything).RunAndReturn(func(ctx context.Context) (int64, error) {
|
|
idCounter++
|
|
return idCounter, nil
|
|
}).Maybe()
|
|
|
|
s.catalog = mocks.NewDataCoordCatalog(s.T())
|
|
s.catalog.EXPECT().SaveStatsTask(mock.Anything, mock.Anything).Return(nil).Maybe()
|
|
s.catalog.EXPECT().DropStatsTask(mock.Anything, mock.Anything).Return(nil).Maybe()
|
|
s.catalog.EXPECT().ListStatsTasks(mock.Anything).Return([]*indexpb.StatsTask{}, nil).Maybe()
|
|
|
|
collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()
|
|
collections.Insert(1, &collectionInfo{
|
|
ID: 1,
|
|
Schema: &schemapb.CollectionSchema{
|
|
Fields: []*schemapb.FieldSchema{
|
|
{
|
|
FieldID: 100,
|
|
Name: "pk",
|
|
DataType: schemapb.DataType_Int64,
|
|
},
|
|
{
|
|
FieldID: 101,
|
|
Name: "var",
|
|
DataType: schemapb.DataType_VarChar,
|
|
TypeParams: []*commonpb.KeyValuePair{
|
|
{
|
|
Key: "enable_match", Value: "true",
|
|
},
|
|
{
|
|
Key: "enable_analyzer", Value: "true",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
})
|
|
|
|
s.mt = &meta{
|
|
collections: collections,
|
|
segments: &SegmentsInfo{
|
|
segments: map[UniqueID]*SegmentInfo{
|
|
10: {
|
|
SegmentInfo: &datapb.SegmentInfo{
|
|
ID: 10,
|
|
CollectionID: 1,
|
|
PartitionID: 2,
|
|
IsSorted: false,
|
|
State: commonpb.SegmentState_Flushed,
|
|
NumOfRows: 1000,
|
|
MaxRowNum: 2000,
|
|
Level: 2,
|
|
},
|
|
},
|
|
20: {
|
|
SegmentInfo: &datapb.SegmentInfo{
|
|
ID: 20,
|
|
CollectionID: 1,
|
|
PartitionID: 2,
|
|
IsSorted: true,
|
|
State: commonpb.SegmentState_Flushed,
|
|
NumOfRows: 1000,
|
|
MaxRowNum: 2000,
|
|
Level: 2,
|
|
},
|
|
},
|
|
30: {
|
|
SegmentInfo: &datapb.SegmentInfo{
|
|
ID: 30,
|
|
CollectionID: 1,
|
|
PartitionID: 2,
|
|
State: commonpb.SegmentState_Flushing,
|
|
NumOfRows: 1000,
|
|
MaxRowNum: 2000,
|
|
Level: 2,
|
|
},
|
|
},
|
|
},
|
|
secondaryIndexes: segmentInfoIndexes{
|
|
coll2Segments: map[UniqueID]map[UniqueID]*SegmentInfo{
|
|
1: {
|
|
10: {
|
|
SegmentInfo: &datapb.SegmentInfo{
|
|
ID: 10,
|
|
CollectionID: 1,
|
|
PartitionID: 2,
|
|
IsSorted: false,
|
|
State: commonpb.SegmentState_Flushed,
|
|
NumOfRows: 1000,
|
|
MaxRowNum: 2000,
|
|
Level: 2,
|
|
},
|
|
},
|
|
20: {
|
|
SegmentInfo: &datapb.SegmentInfo{
|
|
ID: 20,
|
|
CollectionID: 1,
|
|
PartitionID: 2,
|
|
IsSorted: true,
|
|
State: commonpb.SegmentState_Flushed,
|
|
NumOfRows: 1000,
|
|
MaxRowNum: 2000,
|
|
Level: 2,
|
|
},
|
|
},
|
|
30: {
|
|
SegmentInfo: &datapb.SegmentInfo{
|
|
ID: 30,
|
|
CollectionID: 1,
|
|
PartitionID: 2,
|
|
State: commonpb.SegmentState_Flushing,
|
|
NumOfRows: 1000,
|
|
MaxRowNum: 2000,
|
|
Level: 2,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
statsTaskMeta: &statsTaskMeta{
|
|
ctx: s.ctx,
|
|
catalog: s.catalog,
|
|
keyLock: lock.NewKeyLock[UniqueID](),
|
|
tasks: typeutil.NewConcurrentMap[UniqueID, *indexpb.StatsTask](),
|
|
segmentID2Tasks: typeutil.NewConcurrentMap[string, *indexpb.StatsTask](),
|
|
},
|
|
}
|
|
s.cluster = session.NewMockCluster(s.T())
|
|
|
|
gs := task.NewMockGlobalScheduler(s.T())
|
|
gs.EXPECT().Enqueue(mock.Anything).Return().Maybe()
|
|
gs.EXPECT().AbortAndRemoveTask(mock.Anything).Return().Maybe()
|
|
s.scheduler = gs
|
|
|
|
s.inspector = newStatsInspector(
|
|
s.ctx,
|
|
s.mt,
|
|
s.scheduler,
|
|
s.alloc,
|
|
nil,
|
|
nil,
|
|
newIndexEngineVersionManager(),
|
|
)
|
|
}
|
|
|
|
func (s *statsInspectorSuite) TearDownTest() {
|
|
s.cancel()
|
|
}
|
|
|
|
func (s *statsInspectorSuite) TestStart() {
|
|
s.inspector.Start()
|
|
time.Sleep(10 * time.Millisecond) // Give goroutines some time to start
|
|
|
|
s.inspector.Stop()
|
|
s.False(s.inspector.ctx.Done() == nil, "Context should be cancelled")
|
|
}
|
|
|
|
func (s *statsInspectorSuite) TestSubmitStatsTask() {
|
|
// Test submitting a valid stats task
|
|
err := s.inspector.SubmitStatsTask(10, 10, indexpb.StatsSubJob_Sort, true)
|
|
s.NoError(err)
|
|
|
|
// Test submitting a task for non-existent segment
|
|
err = s.inspector.SubmitStatsTask(999, 999, indexpb.StatsSubJob_Sort, true)
|
|
s.Error(err)
|
|
s.True(errors.Is(err, merr.ErrSegmentNotFound), "Error should be ErrSegmentNotFound")
|
|
|
|
s.mt.statsTaskMeta.tasks.Insert(1001, &indexpb.StatsTask{
|
|
TaskID: 1001,
|
|
SegmentID: 10,
|
|
SubJobType: indexpb.StatsSubJob_Sort,
|
|
})
|
|
s.mt.statsTaskMeta.segmentID2Tasks.Insert("10-Sort", &indexpb.StatsTask{
|
|
TaskID: 1001,
|
|
SegmentID: 10,
|
|
SubJobType: indexpb.StatsSubJob_Sort,
|
|
})
|
|
|
|
// Simulate duplicate task error
|
|
err = s.inspector.SubmitStatsTask(10, 10, indexpb.StatsSubJob_Sort, true)
|
|
s.NoError(err) // Duplicate tasks are handled as success
|
|
}
|
|
|
|
func (s *statsInspectorSuite) TestGetStatsTask() {
|
|
// Set up mock task
|
|
s.mt.statsTaskMeta.tasks.Insert(1002, &indexpb.StatsTask{
|
|
TaskID: 1002,
|
|
SegmentID: 10,
|
|
SubJobType: indexpb.StatsSubJob_Sort,
|
|
State: indexpb.JobState_JobStateInProgress,
|
|
})
|
|
s.mt.statsTaskMeta.segmentID2Tasks.Insert("10-Sort", &indexpb.StatsTask{
|
|
TaskID: 1002,
|
|
SegmentID: 10,
|
|
SubJobType: indexpb.StatsSubJob_Sort,
|
|
State: indexpb.JobState_JobStateInProgress,
|
|
})
|
|
|
|
// Test retrieving task
|
|
task := s.inspector.GetStatsTask(10, indexpb.StatsSubJob_Sort)
|
|
s.NotNil(task)
|
|
s.Equal(int64(1002), task.TaskID)
|
|
s.Equal(indexpb.JobState_JobStateInProgress, task.State)
|
|
}
|
|
|
|
func (s *statsInspectorSuite) TestDropStatsTask() {
|
|
// Set up mock task
|
|
s.mt.statsTaskMeta.tasks.Insert(1003, &indexpb.StatsTask{
|
|
TaskID: 1003,
|
|
SegmentID: 10,
|
|
SubJobType: indexpb.StatsSubJob_Sort,
|
|
})
|
|
s.mt.statsTaskMeta.segmentID2Tasks.Insert("10-Sort", &indexpb.StatsTask{
|
|
TaskID: 1003,
|
|
SegmentID: 10,
|
|
SubJobType: indexpb.StatsSubJob_Sort,
|
|
})
|
|
|
|
// Test dropping task
|
|
err := s.inspector.DropStatsTask(10, indexpb.StatsSubJob_Sort)
|
|
s.NoError(err)
|
|
|
|
// Test dropping non-existent task
|
|
err = s.inspector.DropStatsTask(999, indexpb.StatsSubJob_Sort)
|
|
s.NoError(err) // Non-existent tasks should return success
|
|
}
|
|
|
|
func (s *statsInspectorSuite) TestTriggerTextStatsTask() {
|
|
// Set up a sorted segment without text index
|
|
segment := s.mt.segments.segments[20]
|
|
segment.IsSorted = true
|
|
segment.TextStatsLogs = nil
|
|
|
|
// Test triggering text index stats task
|
|
s.inspector.triggerTextStatsTask()
|
|
|
|
// Verify task creation
|
|
s.alloc.AssertCalled(s.T(), "AllocID", mock.Anything)
|
|
}
|
|
|
|
func (s *statsInspectorSuite) TestTriggerBM25StatsTask() {
|
|
// BM25 functionality is disabled in current version
|
|
s.inspector.triggerBM25StatsTask()
|
|
|
|
// No tasks should be created
|
|
// Because enableBM25() returns false
|
|
}
|
|
|
|
func (s *statsInspectorSuite) TestTriggerStatsTaskLoop() {
|
|
// Simulate short-term triggerStatsTaskLoop execution
|
|
s.inspector.loopWg.Add(1)
|
|
go s.inspector.triggerStatsTaskLoop()
|
|
|
|
// Wait for goroutine to execute
|
|
time.Sleep(1500 * time.Millisecond)
|
|
|
|
// Send a segment ID to the channel
|
|
getStatsTaskChSingleton() <- 10
|
|
|
|
// Wait for processing
|
|
time.Sleep(500 * time.Millisecond)
|
|
|
|
// Stop the loop
|
|
s.cancel()
|
|
s.inspector.loopWg.Wait()
|
|
}
|
|
|
|
func (s *statsInspectorSuite) TestCleanupStatsTasksLoop() {
|
|
// Simulate short-term cleanupStatsTasksLoop execution
|
|
s.inspector.loopWg.Add(1)
|
|
go s.inspector.cleanupStatsTasksLoop()
|
|
|
|
// Wait for GC to execute
|
|
time.Sleep(1500 * time.Millisecond)
|
|
|
|
// Stop the loop
|
|
s.cancel()
|
|
s.inspector.loopWg.Wait()
|
|
}
|
|
|
|
func (s *statsInspectorSuite) TestReloadFromMeta() {
|
|
// Set up some existing tasks
|
|
s.mt.statsTaskMeta.tasks.Insert(1005, &indexpb.StatsTask{
|
|
TaskID: 1005,
|
|
SegmentID: 10,
|
|
SubJobType: indexpb.StatsSubJob_Sort,
|
|
State: indexpb.JobState_JobStateInProgress,
|
|
})
|
|
|
|
// Test reloading
|
|
s.inspector.reloadFromMeta()
|
|
}
|
|
|
|
func (s *statsInspectorSuite) TestNeedDoTextIndex() {
|
|
// Test case when text index is needed
|
|
segment := s.mt.segments.segments[20]
|
|
segment.IsSorted = true
|
|
result := needDoTextIndex(segment, []int64{101})
|
|
s.True(result, "Segment should need text index")
|
|
|
|
// Test case when text index already exists
|
|
segment.TextStatsLogs = map[int64]*datapb.TextIndexStats{
|
|
101: {},
|
|
}
|
|
result = needDoTextIndex(segment, []int64{101})
|
|
s.False(result, "Segment should not need text index")
|
|
|
|
// Test case with unsorted segment
|
|
segment.IsSorted = false
|
|
result = needDoTextIndex(segment, []int64{101})
|
|
s.False(result, "Unsorted segment should not need text index")
|
|
}
|
|
|
|
func (s *statsInspectorSuite) TestEnableBM25() {
|
|
// Test if BM25 is enabled
|
|
result := s.inspector.enableBM25()
|
|
s.False(result, "BM25 should be disabled by default")
|
|
}
|