From 2b3fa8f67b4e1babd4a938badc86404d2c9fbb52 Mon Sep 17 00:00:00 2001 From: congqixia Date: Tue, 21 Nov 2023 10:28:21 +0800 Subject: [PATCH] fix: Add length check for `storage.NewPrimaryKeyStats` (#28576) See also #28575 Add zero-length check for `storage.NewPrimaryKeyStats`. This function shall return error when non-positive rowNum passed. Signed-off-by: Congqi Xia --- internal/datanode/compactor.go | 5 +- internal/datanode/compactor_test.go | 71 +++++++++++++++++++++++--- internal/datanode/mock_test.go | 3 +- internal/datanode/syncmgr/task.go | 14 ++--- internal/datanode/syncmgr/task_test.go | 22 ++++++++ internal/storage/stats.go | 7 ++- 6 files changed, 106 insertions(+), 16 deletions(-) diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index 7fa4453103..a7295dbc2f 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -356,7 +356,10 @@ func (t *compactionTask) merge( return nil, nil, 0, err } - stats := storage.NewPrimaryKeyStats(pkID, int64(pkType), oldRowNums) + stats, err := storage.NewPrimaryKeyStats(pkID, int64(pkType), oldRowNums) + if err != nil { + return nil, nil, 0, err + } // initial timestampFrom, timestampTo = -1, -1 is an illegal value, only to mark initial state var ( timestampTo int64 = -1 diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index e52ff5f738..6ee4a3b44c 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -476,6 +476,59 @@ func TestCompactionTaskInnerMethods(t *testing.T) { assert.Equal(t, 0, len(statsPaths)) }) + t.Run("merge_with_rownum_zero", func(t *testing.T) { + mockbIO := &binlogIO{cm, alloc} + iData := genInsertDataWithExpiredTS() + meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64) + metaCache := metacache.NewMockMetaCache(t) + metaCache.EXPECT().Schema().Return(meta.GetSchema()).Maybe() + metaCache.EXPECT().GetSegmentByID(mock.Anything).RunAndReturn(func(id int64, filters ...metacache.SegmentFilter) (*metacache.SegmentInfo, bool) { + segment := metacache.NewSegmentInfo(&datapb.SegmentInfo{ + CollectionID: 1, + PartitionID: 0, + ID: id, + NumOfRows: 0, + }, nil) + return segment, true + }) + + var allPaths [][]string + inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta) + assert.NoError(t, err) + assert.Equal(t, 12, len(inpath)) + binlogNum := len(inpath[0].GetBinlogs()) + assert.Equal(t, 1, binlogNum) + + for idx := 0; idx < binlogNum; idx++ { + var ps []string + for _, path := range inpath { + ps = append(ps, path.GetBinlogs()[idx].GetLogPath()) + } + allPaths = append(allPaths, ps) + } + + dm := map[interface{}]Timestamp{ + 1: 10000, + } + + ct := &compactionTask{ + metaCache: metaCache, + downloader: mockbIO, + uploader: mockbIO, + done: make(chan struct{}, 1), + plan: &datapb.CompactionPlan{ + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + {SegmentID: 1}, + }, + }, + } + _, _, _, err = ct.merge(context.Background(), allPaths, 2, 0, &etcdpb.CollectionMeta{ + Schema: meta.GetSchema(), + }, dm) + assert.Error(t, err) + t.Log(err) + }) + t.Run("Merge with meta error", func(t *testing.T) { mockbIO := &binlogIO{cm, alloc} paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0") @@ -696,14 +749,16 @@ func TestCompactionTaskInnerMethods(t *testing.T) { alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil) meta := f.GetCollectionMeta(UniqueID(10001), "test_upload_remain_log", schemapb.DataType_Int64) - stats := storage.NewPrimaryKeyStats(106, int64(schemapb.DataType_Int64), 10) + stats, err := storage.NewPrimaryKeyStats(106, int64(schemapb.DataType_Int64), 10) + + require.NoError(t, err) ct := &compactionTask{ uploader: &binlogIO{&mockCm{errSave: true}, alloc}, done: make(chan struct{}, 1), } - _, _, err := ct.uploadRemainLog(ctx, 1, 2, meta, stats, 10, nil, nil) + _, _, err = ct.uploadRemainLog(ctx, 1, 2, meta, stats, 10, nil, nil) assert.Error(t, err) }) }) @@ -872,14 +927,16 @@ func TestCompactorInterfaceMethods(t *testing.T) { RowCount: 1, } - stats1 := storage.NewPrimaryKeyStats(1, int64(c.pkType), 1) + stats1, err := storage.NewPrimaryKeyStats(1, int64(c.pkType), 1) + require.NoError(t, err) iPaths1, sPaths1, err := mockbIO.uploadStatsLog(context.TODO(), c.segID1, c.parID, iData1, stats1, 2, meta) require.NoError(t, err) dPaths1, err := mockbIO.uploadDeltaLog(context.TODO(), c.segID1, c.parID, dData1, meta) require.NoError(t, err) require.Equal(t, 12, len(iPaths1)) - stats2 := storage.NewPrimaryKeyStats(1, int64(c.pkType), 1) + stats2, err := storage.NewPrimaryKeyStats(1, int64(c.pkType), 1) + require.NoError(t, err) iPaths2, sPaths2, err := mockbIO.uploadStatsLog(context.TODO(), c.segID2, c.parID, iData2, stats2, 2, meta) require.NoError(t, err) dPaths2, err := mockbIO.uploadDeltaLog(context.TODO(), c.segID2, c.parID, dData2, meta) @@ -1008,14 +1065,16 @@ func TestCompactorInterfaceMethods(t *testing.T) { RowCount: 0, } - stats1 := storage.NewPrimaryKeyStats(1, int64(schemapb.DataType_Int64), 1) + stats1, err := storage.NewPrimaryKeyStats(1, int64(schemapb.DataType_Int64), 1) + require.NoError(t, err) iPaths1, sPaths1, err := mockbIO.uploadStatsLog(context.TODO(), segID1, partID, iData1, stats1, 1, meta) require.NoError(t, err) dPaths1, err := mockbIO.uploadDeltaLog(context.TODO(), segID1, partID, dData1, meta) require.NoError(t, err) require.Equal(t, 12, len(iPaths1)) - stats2 := storage.NewPrimaryKeyStats(1, int64(schemapb.DataType_Int64), 1) + stats2, err := storage.NewPrimaryKeyStats(1, int64(schemapb.DataType_Int64), 1) + require.NoError(t, err) iPaths2, sPaths2, err := mockbIO.uploadStatsLog(context.TODO(), segID2, partID, iData2, stats2, 1, meta) require.NoError(t, err) dPaths2, err := mockbIO.uploadDeltaLog(context.TODO(), segID2, partID, dData2, meta) diff --git a/internal/datanode/mock_test.go b/internal/datanode/mock_test.go index 59e661dde9..66aec069e3 100644 --- a/internal/datanode/mock_test.go +++ b/internal/datanode/mock_test.go @@ -1117,7 +1117,8 @@ func genTestStat(meta *etcdpb.CollectionMeta) *storage.PrimaryKeyStats { pkFieldType = int64(field.DataType) } } - return storage.NewPrimaryKeyStats(pkFieldID, pkFieldType, 0) + stats, _ := storage.NewPrimaryKeyStats(pkFieldID, pkFieldType, 100) + return stats } func genInsertData() *InsertData { diff --git a/internal/datanode/syncmgr/task.go b/internal/datanode/syncmgr/task.go index 21617752df..4963fd06f3 100644 --- a/internal/datanode/syncmgr/task.go +++ b/internal/datanode/syncmgr/task.go @@ -237,11 +237,11 @@ func (t *SyncTask) convertInsertData2PkStats(pkFieldID int64, dataType schemapb. pkFieldData := t.insertData.Data[pkFieldID] rowNum := int64(pkFieldData.RowNum()) - if rowNum == 0 { + + stats, err := storage.NewPrimaryKeyStats(pkFieldID, int64(dataType), rowNum) + if err != nil { return nil, 0 } - - stats := storage.NewPrimaryKeyStats(pkFieldID, int64(dataType), rowNum) stats.UpdateByMsgs(pkFieldData) return stats, rowNum } @@ -310,9 +310,11 @@ func (t *SyncTask) serializePkStatsLog() error { fieldID := pkField.GetFieldID() if t.insertData != nil { stats, rowNum := t.convertInsertData2PkStats(fieldID, pkField.GetDataType()) - err := t.serializeSinglePkStats(fieldID, stats, rowNum) - if err != nil { - return err + if stats != nil && rowNum > 0 { + err := t.serializeSinglePkStats(fieldID, stats, rowNum) + if err != nil { + return err + } } } diff --git a/internal/datanode/syncmgr/task_test.go b/internal/datanode/syncmgr/task_test.go index b9091f505f..5e2da28e9b 100644 --- a/internal/datanode/syncmgr/task_test.go +++ b/internal/datanode/syncmgr/task_test.go @@ -214,6 +214,28 @@ func (s *SyncTaskSuite) TestRunNormal() { err := task.Run() s.NoError(err) }) + + s.Run("with_zero_numrow_insertdata", func() { + task := s.getSuiteSyncTask() + task.WithInsertData(s.getEmptyInsertBuffer()) + task.WithFlush() + task.WithDrop() + task.WithMetaWriter(BrokerMetaWriter(s.broker)) + task.WithCheckpoint(&msgpb.MsgPosition{ + ChannelName: s.channelName, + MsgID: []byte{1, 2, 3, 4}, + Timestamp: 100, + }) + + err := task.Run() + s.Error(err) + + err = task.serializePkStatsLog() + s.NoError(err) + stats, rowNum := task.convertInsertData2PkStats(100, schemapb.DataType_Int64) + s.Nil(stats) + s.Zero(rowNum) + }) } func (s *SyncTaskSuite) TestRunError() { diff --git a/internal/storage/stats.go b/internal/storage/stats.go index 19522a0423..f4792754e3 100644 --- a/internal/storage/stats.go +++ b/internal/storage/stats.go @@ -190,12 +190,15 @@ func (stats *PrimaryKeyStats) UpdateMinMax(pk PrimaryKey) { } } -func NewPrimaryKeyStats(fieldID, pkType, rowNum int64) *PrimaryKeyStats { +func NewPrimaryKeyStats(fieldID, pkType, rowNum int64) (*PrimaryKeyStats, error) { + if rowNum <= 0 { + return nil, merr.WrapErrParameterInvalidMsg("non zero & non negative row num", rowNum) + } return &PrimaryKeyStats{ FieldID: fieldID, PkType: pkType, BF: bloom.NewWithEstimates(uint(rowNum), MaxBloomFalsePositive), - } + }, nil } // StatsWriter writes stats to buffer