fix: Add length check for `storage.NewPrimaryKeyStats` (#28576)

See also #28575
Add zero-length check for `storage.NewPrimaryKeyStats`. This function
shall return error when non-positive rowNum passed.

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/28590/head
congqixia 2023-11-21 10:28:21 +08:00 committed by GitHub
parent c238bff9fb
commit 2b3fa8f67b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 106 additions and 16 deletions

View File

@ -356,7 +356,10 @@ func (t *compactionTask) merge(
return nil, nil, 0, err
}
stats := storage.NewPrimaryKeyStats(pkID, int64(pkType), oldRowNums)
stats, err := storage.NewPrimaryKeyStats(pkID, int64(pkType), oldRowNums)
if err != nil {
return nil, nil, 0, err
}
// initial timestampFrom, timestampTo = -1, -1 is an illegal value, only to mark initial state
var (
timestampTo int64 = -1

View File

@ -476,6 +476,59 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
assert.Equal(t, 0, len(statsPaths))
})
t.Run("merge_with_rownum_zero", func(t *testing.T) {
mockbIO := &binlogIO{cm, alloc}
iData := genInsertDataWithExpiredTS()
meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64)
metaCache := metacache.NewMockMetaCache(t)
metaCache.EXPECT().Schema().Return(meta.GetSchema()).Maybe()
metaCache.EXPECT().GetSegmentByID(mock.Anything).RunAndReturn(func(id int64, filters ...metacache.SegmentFilter) (*metacache.SegmentInfo, bool) {
segment := metacache.NewSegmentInfo(&datapb.SegmentInfo{
CollectionID: 1,
PartitionID: 0,
ID: id,
NumOfRows: 0,
}, nil)
return segment, true
})
var allPaths [][]string
inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta)
assert.NoError(t, err)
assert.Equal(t, 12, len(inpath))
binlogNum := len(inpath[0].GetBinlogs())
assert.Equal(t, 1, binlogNum)
for idx := 0; idx < binlogNum; idx++ {
var ps []string
for _, path := range inpath {
ps = append(ps, path.GetBinlogs()[idx].GetLogPath())
}
allPaths = append(allPaths, ps)
}
dm := map[interface{}]Timestamp{
1: 10000,
}
ct := &compactionTask{
metaCache: metaCache,
downloader: mockbIO,
uploader: mockbIO,
done: make(chan struct{}, 1),
plan: &datapb.CompactionPlan{
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{SegmentID: 1},
},
},
}
_, _, _, err = ct.merge(context.Background(), allPaths, 2, 0, &etcdpb.CollectionMeta{
Schema: meta.GetSchema(),
}, dm)
assert.Error(t, err)
t.Log(err)
})
t.Run("Merge with meta error", func(t *testing.T) {
mockbIO := &binlogIO{cm, alloc}
paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0")
@ -696,14 +749,16 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil)
meta := f.GetCollectionMeta(UniqueID(10001), "test_upload_remain_log", schemapb.DataType_Int64)
stats := storage.NewPrimaryKeyStats(106, int64(schemapb.DataType_Int64), 10)
stats, err := storage.NewPrimaryKeyStats(106, int64(schemapb.DataType_Int64), 10)
require.NoError(t, err)
ct := &compactionTask{
uploader: &binlogIO{&mockCm{errSave: true}, alloc},
done: make(chan struct{}, 1),
}
_, _, err := ct.uploadRemainLog(ctx, 1, 2, meta, stats, 10, nil, nil)
_, _, err = ct.uploadRemainLog(ctx, 1, 2, meta, stats, 10, nil, nil)
assert.Error(t, err)
})
})
@ -872,14 +927,16 @@ func TestCompactorInterfaceMethods(t *testing.T) {
RowCount: 1,
}
stats1 := storage.NewPrimaryKeyStats(1, int64(c.pkType), 1)
stats1, err := storage.NewPrimaryKeyStats(1, int64(c.pkType), 1)
require.NoError(t, err)
iPaths1, sPaths1, err := mockbIO.uploadStatsLog(context.TODO(), c.segID1, c.parID, iData1, stats1, 2, meta)
require.NoError(t, err)
dPaths1, err := mockbIO.uploadDeltaLog(context.TODO(), c.segID1, c.parID, dData1, meta)
require.NoError(t, err)
require.Equal(t, 12, len(iPaths1))
stats2 := storage.NewPrimaryKeyStats(1, int64(c.pkType), 1)
stats2, err := storage.NewPrimaryKeyStats(1, int64(c.pkType), 1)
require.NoError(t, err)
iPaths2, sPaths2, err := mockbIO.uploadStatsLog(context.TODO(), c.segID2, c.parID, iData2, stats2, 2, meta)
require.NoError(t, err)
dPaths2, err := mockbIO.uploadDeltaLog(context.TODO(), c.segID2, c.parID, dData2, meta)
@ -1008,14 +1065,16 @@ func TestCompactorInterfaceMethods(t *testing.T) {
RowCount: 0,
}
stats1 := storage.NewPrimaryKeyStats(1, int64(schemapb.DataType_Int64), 1)
stats1, err := storage.NewPrimaryKeyStats(1, int64(schemapb.DataType_Int64), 1)
require.NoError(t, err)
iPaths1, sPaths1, err := mockbIO.uploadStatsLog(context.TODO(), segID1, partID, iData1, stats1, 1, meta)
require.NoError(t, err)
dPaths1, err := mockbIO.uploadDeltaLog(context.TODO(), segID1, partID, dData1, meta)
require.NoError(t, err)
require.Equal(t, 12, len(iPaths1))
stats2 := storage.NewPrimaryKeyStats(1, int64(schemapb.DataType_Int64), 1)
stats2, err := storage.NewPrimaryKeyStats(1, int64(schemapb.DataType_Int64), 1)
require.NoError(t, err)
iPaths2, sPaths2, err := mockbIO.uploadStatsLog(context.TODO(), segID2, partID, iData2, stats2, 1, meta)
require.NoError(t, err)
dPaths2, err := mockbIO.uploadDeltaLog(context.TODO(), segID2, partID, dData2, meta)

View File

@ -1117,7 +1117,8 @@ func genTestStat(meta *etcdpb.CollectionMeta) *storage.PrimaryKeyStats {
pkFieldType = int64(field.DataType)
}
}
return storage.NewPrimaryKeyStats(pkFieldID, pkFieldType, 0)
stats, _ := storage.NewPrimaryKeyStats(pkFieldID, pkFieldType, 100)
return stats
}
func genInsertData() *InsertData {

View File

@ -237,11 +237,11 @@ func (t *SyncTask) convertInsertData2PkStats(pkFieldID int64, dataType schemapb.
pkFieldData := t.insertData.Data[pkFieldID]
rowNum := int64(pkFieldData.RowNum())
if rowNum == 0 {
stats, err := storage.NewPrimaryKeyStats(pkFieldID, int64(dataType), rowNum)
if err != nil {
return nil, 0
}
stats := storage.NewPrimaryKeyStats(pkFieldID, int64(dataType), rowNum)
stats.UpdateByMsgs(pkFieldData)
return stats, rowNum
}
@ -310,11 +310,13 @@ func (t *SyncTask) serializePkStatsLog() error {
fieldID := pkField.GetFieldID()
if t.insertData != nil {
stats, rowNum := t.convertInsertData2PkStats(fieldID, pkField.GetDataType())
if stats != nil && rowNum > 0 {
err := t.serializeSinglePkStats(fieldID, stats, rowNum)
if err != nil {
return err
}
}
}
if t.isFlush {
return t.serializeMergedPkStats(fieldID, pkField.GetDataType())

View File

@ -214,6 +214,28 @@ func (s *SyncTaskSuite) TestRunNormal() {
err := task.Run()
s.NoError(err)
})
s.Run("with_zero_numrow_insertdata", func() {
task := s.getSuiteSyncTask()
task.WithInsertData(s.getEmptyInsertBuffer())
task.WithFlush()
task.WithDrop()
task.WithMetaWriter(BrokerMetaWriter(s.broker))
task.WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
})
err := task.Run()
s.Error(err)
err = task.serializePkStatsLog()
s.NoError(err)
stats, rowNum := task.convertInsertData2PkStats(100, schemapb.DataType_Int64)
s.Nil(stats)
s.Zero(rowNum)
})
}
func (s *SyncTaskSuite) TestRunError() {

View File

@ -190,12 +190,15 @@ func (stats *PrimaryKeyStats) UpdateMinMax(pk PrimaryKey) {
}
}
func NewPrimaryKeyStats(fieldID, pkType, rowNum int64) *PrimaryKeyStats {
func NewPrimaryKeyStats(fieldID, pkType, rowNum int64) (*PrimaryKeyStats, error) {
if rowNum <= 0 {
return nil, merr.WrapErrParameterInvalidMsg("non zero & non negative row num", rowNum)
}
return &PrimaryKeyStats{
FieldID: fieldID,
PkType: pkType,
BF: bloom.NewWithEstimates(uint(rowNum), MaxBloomFalsePositive),
}
}, nil
}
// StatsWriter writes stats to buffer