milvus/internal/datacoord/stats_inspector_test.go

384 lines
11 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord
import (
"context"
"testing"
"time"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/internal/datacoord/session"
"github.com/milvus-io/milvus/internal/datacoord/task"
"github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/util/lock"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
type statsInspectorSuite struct {
suite.Suite
ctx context.Context
cancel context.CancelFunc
mt *meta
alloc *allocator.MockAllocator
catalog *mocks.DataCoordCatalog
cluster session.Cluster
scheduler task.GlobalScheduler
inspector *statsInspector
}
func Test_statsInspectorSuite(t *testing.T) {
suite.Run(t, new(statsInspectorSuite))
}
func (s *statsInspectorSuite) SetupTest() {
s.ctx, s.cancel = context.WithCancel(context.Background())
Params.Save(Params.DataCoordCfg.TaskCheckInterval.Key, "1")
Params.Save(Params.DataCoordCfg.GCInterval.Key, "1")
s.alloc = allocator.NewMockAllocator(s.T())
var idCounter int64 = 1000
s.alloc.EXPECT().AllocID(mock.Anything).RunAndReturn(func(ctx context.Context) (int64, error) {
idCounter++
return idCounter, nil
}).Maybe()
s.catalog = mocks.NewDataCoordCatalog(s.T())
s.catalog.EXPECT().SaveStatsTask(mock.Anything, mock.Anything).Return(nil).Maybe()
s.catalog.EXPECT().DropStatsTask(mock.Anything, mock.Anything).Return(nil).Maybe()
s.catalog.EXPECT().ListStatsTasks(mock.Anything).Return([]*indexpb.StatsTask{}, nil).Maybe()
collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()
collections.Insert(1, &collectionInfo{
ID: 1,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "pk",
DataType: schemapb.DataType_Int64,
},
{
FieldID: 101,
Name: "var",
DataType: schemapb.DataType_VarChar,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "enable_match", Value: "true",
},
{
Key: "enable_analyzer", Value: "true",
},
},
},
},
},
})
s.mt = &meta{
collections: collections,
segments: &SegmentsInfo{
segments: map[UniqueID]*SegmentInfo{
10: {
SegmentInfo: &datapb.SegmentInfo{
ID: 10,
CollectionID: 1,
PartitionID: 2,
IsSorted: false,
State: commonpb.SegmentState_Flushed,
NumOfRows: 1000,
MaxRowNum: 2000,
Level: 2,
},
},
20: {
SegmentInfo: &datapb.SegmentInfo{
ID: 20,
CollectionID: 1,
PartitionID: 2,
IsSorted: true,
State: commonpb.SegmentState_Flushed,
NumOfRows: 1000,
MaxRowNum: 2000,
Level: 2,
},
},
30: {
SegmentInfo: &datapb.SegmentInfo{
ID: 30,
CollectionID: 1,
PartitionID: 2,
State: commonpb.SegmentState_Flushing,
NumOfRows: 1000,
MaxRowNum: 2000,
Level: 2,
},
},
},
secondaryIndexes: segmentInfoIndexes{
coll2Segments: map[UniqueID]map[UniqueID]*SegmentInfo{
1: {
10: {
SegmentInfo: &datapb.SegmentInfo{
ID: 10,
CollectionID: 1,
PartitionID: 2,
IsSorted: false,
State: commonpb.SegmentState_Flushed,
NumOfRows: 1000,
MaxRowNum: 2000,
Level: 2,
},
},
20: {
SegmentInfo: &datapb.SegmentInfo{
ID: 20,
CollectionID: 1,
PartitionID: 2,
IsSorted: true,
State: commonpb.SegmentState_Flushed,
NumOfRows: 1000,
MaxRowNum: 2000,
Level: 2,
},
},
30: {
SegmentInfo: &datapb.SegmentInfo{
ID: 30,
CollectionID: 1,
PartitionID: 2,
State: commonpb.SegmentState_Flushing,
NumOfRows: 1000,
MaxRowNum: 2000,
Level: 2,
},
},
},
},
},
},
statsTaskMeta: &statsTaskMeta{
ctx: s.ctx,
catalog: s.catalog,
keyLock: lock.NewKeyLock[UniqueID](),
tasks: typeutil.NewConcurrentMap[UniqueID, *indexpb.StatsTask](),
segmentID2Tasks: typeutil.NewConcurrentMap[string, *indexpb.StatsTask](),
},
}
s.cluster = session.NewMockCluster(s.T())
gs := task.NewMockGlobalScheduler(s.T())
gs.EXPECT().Enqueue(mock.Anything).Return().Maybe()
gs.EXPECT().AbortAndRemoveTask(mock.Anything).Return().Maybe()
s.scheduler = gs
s.inspector = newStatsInspector(
s.ctx,
s.mt,
s.scheduler,
s.alloc,
nil,
nil,
newIndexEngineVersionManager(),
)
}
func (s *statsInspectorSuite) TearDownTest() {
s.cancel()
}
func (s *statsInspectorSuite) TestStart() {
s.inspector.Start()
time.Sleep(10 * time.Millisecond) // Give goroutines some time to start
s.inspector.Stop()
s.False(s.inspector.ctx.Done() == nil, "Context should be cancelled")
}
func (s *statsInspectorSuite) TestSubmitStatsTask() {
// Test submitting a valid stats task
err := s.inspector.SubmitStatsTask(10, 10, indexpb.StatsSubJob_Sort, true)
s.NoError(err)
// Test submitting a task for non-existent segment
err = s.inspector.SubmitStatsTask(999, 999, indexpb.StatsSubJob_Sort, true)
s.Error(err)
s.True(errors.Is(err, merr.ErrSegmentNotFound), "Error should be ErrSegmentNotFound")
s.mt.statsTaskMeta.tasks.Insert(1001, &indexpb.StatsTask{
TaskID: 1001,
SegmentID: 10,
SubJobType: indexpb.StatsSubJob_Sort,
})
s.mt.statsTaskMeta.segmentID2Tasks.Insert("10-Sort", &indexpb.StatsTask{
TaskID: 1001,
SegmentID: 10,
SubJobType: indexpb.StatsSubJob_Sort,
})
// Simulate duplicate task error
err = s.inspector.SubmitStatsTask(10, 10, indexpb.StatsSubJob_Sort, true)
s.NoError(err) // Duplicate tasks are handled as success
}
func (s *statsInspectorSuite) TestGetStatsTask() {
// Set up mock task
s.mt.statsTaskMeta.tasks.Insert(1002, &indexpb.StatsTask{
TaskID: 1002,
SegmentID: 10,
SubJobType: indexpb.StatsSubJob_Sort,
State: indexpb.JobState_JobStateInProgress,
})
s.mt.statsTaskMeta.segmentID2Tasks.Insert("10-Sort", &indexpb.StatsTask{
TaskID: 1002,
SegmentID: 10,
SubJobType: indexpb.StatsSubJob_Sort,
State: indexpb.JobState_JobStateInProgress,
})
// Test retrieving task
task := s.inspector.GetStatsTask(10, indexpb.StatsSubJob_Sort)
s.NotNil(task)
s.Equal(int64(1002), task.TaskID)
s.Equal(indexpb.JobState_JobStateInProgress, task.State)
}
func (s *statsInspectorSuite) TestDropStatsTask() {
// Set up mock task
s.mt.statsTaskMeta.tasks.Insert(1003, &indexpb.StatsTask{
TaskID: 1003,
SegmentID: 10,
SubJobType: indexpb.StatsSubJob_Sort,
})
s.mt.statsTaskMeta.segmentID2Tasks.Insert("10-Sort", &indexpb.StatsTask{
TaskID: 1003,
SegmentID: 10,
SubJobType: indexpb.StatsSubJob_Sort,
})
// Test dropping task
err := s.inspector.DropStatsTask(10, indexpb.StatsSubJob_Sort)
s.NoError(err)
// Test dropping non-existent task
err = s.inspector.DropStatsTask(999, indexpb.StatsSubJob_Sort)
s.NoError(err) // Non-existent tasks should return success
}
func (s *statsInspectorSuite) TestTriggerTextStatsTask() {
// Set up a sorted segment without text index
segment := s.mt.segments.segments[20]
segment.IsSorted = true
segment.TextStatsLogs = nil
// Test triggering text index stats task
s.inspector.triggerTextStatsTask()
// Verify task creation
s.alloc.AssertCalled(s.T(), "AllocID", mock.Anything)
}
func (s *statsInspectorSuite) TestTriggerBM25StatsTask() {
// BM25 functionality is disabled in current version
s.inspector.triggerBM25StatsTask()
// No tasks should be created
// Because enableBM25() returns false
}
func (s *statsInspectorSuite) TestTriggerStatsTaskLoop() {
// Simulate short-term triggerStatsTaskLoop execution
s.inspector.loopWg.Add(1)
go s.inspector.triggerStatsTaskLoop()
// Wait for goroutine to execute
time.Sleep(1500 * time.Millisecond)
// Send a segment ID to the channel
getStatsTaskChSingleton() <- 10
// Wait for processing
time.Sleep(500 * time.Millisecond)
// Stop the loop
s.cancel()
s.inspector.loopWg.Wait()
}
func (s *statsInspectorSuite) TestCleanupStatsTasksLoop() {
// Simulate short-term cleanupStatsTasksLoop execution
s.inspector.loopWg.Add(1)
go s.inspector.cleanupStatsTasksLoop()
// Wait for GC to execute
time.Sleep(1500 * time.Millisecond)
// Stop the loop
s.cancel()
s.inspector.loopWg.Wait()
}
func (s *statsInspectorSuite) TestReloadFromMeta() {
// Set up some existing tasks
s.mt.statsTaskMeta.tasks.Insert(1005, &indexpb.StatsTask{
TaskID: 1005,
SegmentID: 10,
SubJobType: indexpb.StatsSubJob_Sort,
State: indexpb.JobState_JobStateInProgress,
})
// Test reloading
s.inspector.reloadFromMeta()
}
func (s *statsInspectorSuite) TestNeedDoTextIndex() {
// Test case when text index is needed
segment := s.mt.segments.segments[20]
segment.IsSorted = true
result := needDoTextIndex(segment, []int64{101})
s.True(result, "Segment should need text index")
// Test case when text index already exists
segment.TextStatsLogs = map[int64]*datapb.TextIndexStats{
101: {},
}
result = needDoTextIndex(segment, []int64{101})
s.False(result, "Segment should not need text index")
// Test case with unsorted segment
segment.IsSorted = false
result = needDoTextIndex(segment, []int64{101})
s.False(result, "Unsorted segment should not need text index")
}
func (s *statsInspectorSuite) TestEnableBM25() {
// Test if BM25 is enabled
result := s.inspector.enableBM25()
s.False(result, "BM25 should be disabled by default")
}