From f745d7f48916ee2cc5024c2de2b305d6893040aa Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 1 Dec 2022 20:33:17 +0800 Subject: [PATCH] Fix compaction target segment rowNum is always 0 (#20937) Signed-off-by: Congqi Xia Signed-off-by: Congqi Xia --- internal/datanode/binlog_io.go | 4 ++-- internal/storage/data_codec.go | 18 +++++++++++------- internal/storage/index_data_codec_test.go | 18 +++++++++--------- 3 files changed, 22 insertions(+), 18 deletions(-) diff --git a/internal/datanode/binlog_io.go b/internal/datanode/binlog_io.go index 629b7e8970..65f9adc721 100644 --- a/internal/datanode/binlog_io.go +++ b/internal/datanode/binlog_io.go @@ -285,7 +285,7 @@ func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, meta kvs[key] = value inpaths[fID] = &datapb.FieldBinlog{ FieldID: fID, - Binlogs: []*datapb.Binlog{{LogSize: int64(fileLen), LogPath: key}}, + Binlogs: []*datapb.Binlog{{LogSize: int64(fileLen), LogPath: key, EntriesNum: blob.RowNum}}, } } @@ -302,7 +302,7 @@ func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, meta kvs[key] = value statspaths[fID] = &datapb.FieldBinlog{ FieldID: fID, - Binlogs: []*datapb.Binlog{{LogSize: int64(fileLen), LogPath: key}}, + Binlogs: []*datapb.Binlog{{LogSize: int64(fileLen), LogPath: key, EntriesNum: blob.RowNum}}, } } diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index f2d39f1d25..ea7b292a17 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -64,9 +64,10 @@ const InvalidUniqueID = UniqueID(-1) // Blob is a pack of key&value type Blob struct { - Key string - Value []byte - Size int64 + Key string + Value []byte + Size int64 + RowNum int64 } // BlobList implements sort.Interface for a list of Blob @@ -277,6 +278,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique if timeFieldData.RowNum() <= 0 { return nil, nil, fmt.Errorf("there's no data in InsertData") } + rowNum := int64(timeFieldData.RowNum()) ts := timeFieldData.(*Int64FieldData).Data startTs := ts[0] @@ -420,8 +422,9 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique } blobKey := fmt.Sprintf("%d", field.FieldID) blobs = append(blobs, &Blob{ - Key: blobKey, - Value: buffer, + Key: blobKey, + Value: buffer, + RowNum: rowNum, }) eventWriter.Close() writer.Close() @@ -435,8 +438,9 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique } statsBuffer := statsWriter.GetBuffer() statsBlobs = append(statsBlobs, &Blob{ - Key: blobKey, - Value: statsBuffer, + Key: blobKey, + Value: statsBuffer, + RowNum: rowNum, }) } } diff --git a/internal/storage/index_data_codec_test.go b/internal/storage/index_data_codec_test.go index 7c350a4268..75725f2823 100644 --- a/internal/storage/index_data_codec_test.go +++ b/internal/storage/index_data_codec_test.go @@ -118,19 +118,19 @@ func TestIndexCodec(t *testing.T) { indexCodec := NewIndexCodec() blobs := []*Blob{ { - "12345", - []byte{1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7}, - 14, + Key: "12345", + Value: []byte{1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7}, + Size: 14, }, { - "6666", - []byte{6, 6, 6, 6, 6, 1, 2, 3, 4, 5, 6, 7}, - 12, + Key: "6666", + Value: []byte{6, 6, 6, 6, 6, 1, 2, 3, 4, 5, 6, 7}, + Size: 12, }, { - "8885", - []byte{8, 8, 8, 8, 8, 8, 8, 8, 2, 3, 4, 5, 6, 7}, - 14, + Key: "8885", + Value: []byte{8, 8, 8, 8, 8, 8, 8, 8, 2, 3, 4, 5, 6, 7}, + Size: 14, }, } indexParams := map[string]string{