mirror of https://github.com/milvus-io/milvus.git
fix: [2.5.4] Skip text index creation when segment is zero after sorting (#39978)
issue: #39961 master pr: #39962 Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>pull/40011/head
parent
9b7ebb3d88
commit
8009af7ae2
|
@ -166,3 +166,20 @@ func generateTestData(collID, partID, segID int64, num int) ([]*Blob, error) {
|
|||
blobs, err := insertCodec.Serialize(partID, segID, data)
|
||||
return blobs, err
|
||||
}
|
||||
|
||||
func generateDeleteData(collID, partID, segID int64, num int) ([]*Blob, error) {
|
||||
pks := make([]storage.PrimaryKey, 0, num)
|
||||
tss := make([]storage.Timestamp, 0, num)
|
||||
for i := 1; i <= num; i++ {
|
||||
pks = append(pks, storage.NewInt64PrimaryKey(int64(i)))
|
||||
tss = append(tss, storage.Timestamp(i+1))
|
||||
}
|
||||
|
||||
deleteCodec := storage.NewDeleteCodec()
|
||||
blob, err := deleteCodec.Serialize(collID, partID, segID, &storage.DeleteData{
|
||||
Pks: pks,
|
||||
Tss: tss,
|
||||
RowCount: int64(num),
|
||||
})
|
||||
return []*Blob{blob}, err
|
||||
}
|
||||
|
|
|
@ -185,6 +185,7 @@ type IndexNodeSuite struct {
|
|||
logID int64
|
||||
numRows int64
|
||||
data []*Blob
|
||||
deleteData []*Blob
|
||||
in *IndexNode
|
||||
storageConfig *indexpb.StorageConfig
|
||||
cm storage.ChunkManager
|
||||
|
@ -205,7 +206,10 @@ func (s *IndexNodeSuite) SetupTest() {
|
|||
Params.MinioCfg.RootPath.SwapTempValue("indexnode-ut")
|
||||
|
||||
var err error
|
||||
s.data, err = generateTestData(s.collID, s.partID, s.segID, 3000)
|
||||
s.data, err = generateTestData(s.collID, s.partID, s.segID, int(s.numRows))
|
||||
s.NoError(err)
|
||||
|
||||
s.deleteData, err = generateDeleteData(s.collID, s.partID, s.segID, int(s.numRows))
|
||||
s.NoError(err)
|
||||
|
||||
s.storageConfig = &indexpb.StorageConfig{
|
||||
|
@ -250,6 +254,13 @@ func (s *IndexNodeSuite) SetupTest() {
|
|||
err = s.cm.Write(context.Background(), filePath, blob.GetValue())
|
||||
s.NoError(err)
|
||||
}
|
||||
for i, blob := range s.deleteData {
|
||||
fID, _ := strconv.ParseInt(blob.GetKey(), 10, 64)
|
||||
filePath, err := binlog.BuildLogPath(storage.DeleteBinlog, s.collID, s.partID, s.segID, fID, logID+int64(i))
|
||||
s.NoError(err)
|
||||
err = s.cm.Write(context.Background(), filePath, blob.GetValue())
|
||||
s.NoError(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *IndexNodeSuite) TearDownSuite() {
|
||||
|
@ -688,4 +699,86 @@ func (s *IndexNodeSuite) Test_CreateStatsTask() {
|
|||
err = merr.Error(status)
|
||||
s.NoError(err)
|
||||
})
|
||||
|
||||
s.Run("all deleted", func() {
|
||||
deltaLogs := make([]*datapb.FieldBinlog, 0)
|
||||
for i := range s.deleteData {
|
||||
deltaLogs = append(deltaLogs, &datapb.FieldBinlog{
|
||||
Binlogs: []*datapb.Binlog{{
|
||||
EntriesNum: s.numRows,
|
||||
LogSize: int64(len(s.deleteData[0].GetValue())),
|
||||
MemorySize: s.deleteData[0].GetMemorySize(),
|
||||
LogID: s.logID + int64(i),
|
||||
TimestampFrom: 1,
|
||||
TimestampTo: 3001,
|
||||
}},
|
||||
})
|
||||
}
|
||||
taskID := int64(200)
|
||||
req := &workerpb.CreateStatsRequest{
|
||||
ClusterID: "cluster2",
|
||||
TaskID: taskID,
|
||||
CollectionID: s.collID,
|
||||
PartitionID: s.partID,
|
||||
InsertChannel: "ch1",
|
||||
SegmentID: s.segID,
|
||||
InsertLogs: fieldBinlogs,
|
||||
DeltaLogs: deltaLogs,
|
||||
StorageConfig: s.storageConfig,
|
||||
Schema: generateTestSchema(),
|
||||
TargetSegmentID: s.segID + 1,
|
||||
StartLogID: s.logID + 100,
|
||||
EndLogID: s.logID + 200,
|
||||
NumRows: s.numRows,
|
||||
BinlogMaxSize: 131000,
|
||||
SubJobType: indexpb.StatsSubJob_Sort,
|
||||
}
|
||||
|
||||
status, err := s.in.CreateJobV2(ctx, &workerpb.CreateJobV2Request{
|
||||
ClusterID: "cluster2",
|
||||
TaskID: taskID,
|
||||
JobType: indexpb.JobType_JobTypeStatsJob,
|
||||
Request: &workerpb.CreateJobV2Request_StatsRequest{
|
||||
StatsRequest: req,
|
||||
},
|
||||
})
|
||||
s.NoError(err)
|
||||
err = merr.Error(status)
|
||||
s.NoError(err)
|
||||
|
||||
for {
|
||||
resp, err := s.in.QueryJobsV2(ctx, &workerpb.QueryJobsV2Request{
|
||||
ClusterID: "cluster2",
|
||||
TaskIDs: []int64{taskID},
|
||||
JobType: indexpb.JobType_JobTypeStatsJob,
|
||||
})
|
||||
s.NoError(err)
|
||||
err = merr.Error(resp.GetStatus())
|
||||
s.NoError(err)
|
||||
s.Equal(1, len(resp.GetStatsJobResults().GetResults()))
|
||||
if resp.GetStatsJobResults().GetResults()[0].GetState() == indexpb.JobState_JobStateFinished {
|
||||
s.Zero(len(resp.GetStatsJobResults().GetResults()[0].GetInsertLogs()))
|
||||
s.Equal(int64(0), resp.GetStatsJobResults().GetResults()[0].GetNumRows())
|
||||
break
|
||||
}
|
||||
s.Equal(indexpb.JobState_JobStateInProgress, resp.GetStatsJobResults().GetResults()[0].GetState())
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
slotResp, err := s.in.GetJobStats(ctx, &workerpb.GetJobStatsRequest{})
|
||||
s.NoError(err)
|
||||
err = merr.Error(slotResp.GetStatus())
|
||||
s.NoError(err)
|
||||
|
||||
s.Equal(int64(1), slotResp.GetTaskSlots())
|
||||
|
||||
status, err = s.in.DropJobsV2(ctx, &workerpb.DropJobsV2Request{
|
||||
ClusterID: "cluster2",
|
||||
TaskIDs: []int64{taskID},
|
||||
JobType: indexpb.JobType_JobTypeStatsJob,
|
||||
})
|
||||
s.NoError(err)
|
||||
err = merr.Error(status)
|
||||
s.NoError(err)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -319,6 +319,11 @@ func (st *statsTask) Execute(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
|
||||
if len(insertLogs) == 0 {
|
||||
log.Ctx(ctx).Info("there is no insertBinlogs, skip creating text index")
|
||||
return nil
|
||||
}
|
||||
|
||||
if st.req.GetSubJobType() == indexpb.StatsSubJob_Sort || st.req.GetSubJobType() == indexpb.StatsSubJob_TextIndexJob {
|
||||
err = st.createTextIndex(ctx,
|
||||
st.req.GetStorageConfig(),
|
||||
|
|
Loading…
Reference in New Issue