diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index df7ebc65d9..0c26db1126 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -813,7 +813,7 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compa zap.Uint64("binlogTimestampTo", l.TimestampTo), zap.Uint64("compactExpireTime", compactTime.expireTime)) totalExpiredRows += int(l.GetEntriesNum()) - totalExpiredSize += l.GetLogSize() + totalExpiredSize += l.GetMemorySize() } } } @@ -831,7 +831,7 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compa for _, deltaLogs := range segment.GetDeltalogs() { for _, l := range deltaLogs.GetBinlogs() { totalDeletedRows += int(l.GetEntriesNum()) - totalDeleteLogSize += l.GetLogSize() + totalDeleteLogSize += l.GetMemorySize() } } diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index a99c10d810..417bf92b04 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -893,7 +893,7 @@ func Test_compactionTrigger_noplan(t *testing.T) { Binlogs: []*datapb.FieldBinlog{ { Binlogs: []*datapb.Binlog{ - {EntriesNum: 5, LogPath: "log1", LogSize: 100}, + {EntriesNum: 5, LogPath: "log1", LogSize: 100, MemorySize: 100}, }, }, }, @@ -913,7 +913,7 @@ func Test_compactionTrigger_noplan(t *testing.T) { Binlogs: []*datapb.FieldBinlog{ { Binlogs: []*datapb.Binlog{ - {EntriesNum: 5, LogPath: "log2", LogSize: Params.DataCoordCfg.SegmentMaxSize.GetAsInt64()*1024*1024 - 1}, + {EntriesNum: 5, LogPath: "log2", LogSize: Params.DataCoordCfg.SegmentMaxSize.GetAsInt64()*1024*1024 - 1, MemorySize: Params.DataCoordCfg.SegmentMaxSize.GetAsInt64()*1024*1024 - 1}, }, }, }, @@ -1015,7 +1015,7 @@ func Test_compactionTrigger_PrioritizedCandi(t *testing.T) { Binlogs: []*datapb.FieldBinlog{ { Binlogs: []*datapb.Binlog{ - {EntriesNum: numRows, LogPath: "log1", LogSize: 100}, + {EntriesNum: numRows, LogPath: "log1", LogSize: 100, MemorySize: 100}, }, }, }, @@ -1207,7 +1207,7 @@ func Test_compactionTrigger_SmallCandi(t *testing.T) { Binlogs: []*datapb.FieldBinlog{ { Binlogs: []*datapb.Binlog{ - {EntriesNum: 5, LogPath: "log1", LogSize: numRows * 1024 * 1024}, + {EntriesNum: 5, LogPath: "log1", LogSize: numRows * 1024 * 1024, MemorySize: numRows * 1024 * 1024}, }, }, }, @@ -1400,7 +1400,7 @@ func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) { Binlogs: []*datapb.FieldBinlog{ { Binlogs: []*datapb.Binlog{ - {EntriesNum: 5, LogPath: "log1", LogSize: numRows * 1024 * 1024}, + {EntriesNum: 5, LogPath: "log1", LogSize: numRows * 1024 * 1024, MemorySize: numRows * 1024 * 1024}, }, }, }, @@ -1627,7 +1627,7 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) { Binlogs: []*datapb.FieldBinlog{ { Binlogs: []*datapb.Binlog{ - {EntriesNum: 5, LogPath: "log1", LogSize: size[i] * 2 * 1024 * 1024}, + {EntriesNum: 5, LogPath: "log1", LogSize: size[i] * 2 * 1024 * 1024, MemorySize: size[i] * 2 * 1024 * 1024}, }, }, }, @@ -1766,7 +1766,7 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { for i := UniqueID(0); i < 1000; i++ { binlogs = append(binlogs, &datapb.FieldBinlog{ Binlogs: []*datapb.Binlog{ - {EntriesNum: 5, LogPath: "log1", LogSize: 100}, + {EntriesNum: 5, LogPath: "log1", LogSize: 100, MemorySize: 100}, }, }) } @@ -1810,7 +1810,7 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { for i := UniqueID(0); i < 100; i++ { binlogs2 = append(binlogs2, &datapb.FieldBinlog{ Binlogs: []*datapb.Binlog{ - {EntriesNum: 5, LogPath: "log1", LogSize: 100000, TimestampFrom: 300, TimestampTo: 500}, + {EntriesNum: 5, LogPath: "log1", LogSize: 100000, TimestampFrom: 300, TimestampTo: 500, MemorySize: 100000}, }, }) } @@ -1818,7 +1818,7 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { for i := UniqueID(0); i < 100; i++ { binlogs2 = append(binlogs2, &datapb.FieldBinlog{ Binlogs: []*datapb.Binlog{ - {EntriesNum: 5, LogPath: "log1", LogSize: 1000000, TimestampFrom: 300, TimestampTo: 1000}, + {EntriesNum: 5, LogPath: "log1", LogSize: 1000000, TimestampFrom: 300, TimestampTo: 1000, MemorySize: 1000000}, }, }) } @@ -1853,7 +1853,7 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { for i := UniqueID(0); i < 100; i++ { binlogs3 = append(binlogs2, &datapb.FieldBinlog{ Binlogs: []*datapb.Binlog{ - {EntriesNum: 5, LogPath: "log1", LogSize: 100000, TimestampFrom: 300, TimestampTo: 500}, + {EntriesNum: 5, LogPath: "log1", LogSize: 100000, TimestampFrom: 300, TimestampTo: 500, MemorySize: 100000}, }, }) } @@ -2162,7 +2162,7 @@ func (s *CompactionTriggerSuite) genSeg(segID, numRows int64) *datapb.SegmentInf Binlogs: []*datapb.FieldBinlog{ { Binlogs: []*datapb.Binlog{ - {EntriesNum: 5, LogPath: "log1", LogSize: 100}, + {EntriesNum: 5, LogPath: "log1", LogSize: 100, MemorySize: 100}, }, }, }, diff --git a/internal/datacoord/compaction_view.go b/internal/datacoord/compaction_view.go index a523cb007b..b82106213c 100644 --- a/internal/datacoord/compaction_view.go +++ b/internal/datacoord/compaction_view.go @@ -4,12 +4,10 @@ import ( "fmt" "github.com/samber/lo" - "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/pkg/log" ) type CompactionView interface { @@ -166,29 +164,11 @@ func GetBinlogCount(fieldBinlogs []*datapb.FieldBinlog) int { return num } -func GetExpiredSizeAsBytes(expireTime Timestamp, fieldBinlogs []*datapb.FieldBinlog) float64 { - var expireSize float64 - for _, binlogs := range fieldBinlogs { - for _, l := range binlogs.GetBinlogs() { - // TODO, we should probably estimate expired log entries by total rows - // in binlog and the ralationship of timeTo, timeFrom and expire time - if l.TimestampTo < expireTime { - log.Info("mark binlog as expired", - zap.Int64("binlogID", l.GetLogID()), - zap.Uint64("binlogTimestampTo", l.TimestampTo), - zap.Uint64("compactExpireTime", expireTime)) - expireSize += float64(l.GetLogSize()) - } - } - } - return expireSize -} - func GetBinlogSizeAsBytes(deltaBinlogs []*datapb.FieldBinlog) float64 { var deltaSize float64 for _, deltaLogs := range deltaBinlogs { for _, l := range deltaLogs.GetBinlogs() { - deltaSize += float64(l.GetLogSize()) + deltaSize += float64(l.GetMemorySize()) } } return deltaSize diff --git a/internal/datacoord/compaction_view_manager_test.go b/internal/datacoord/compaction_view_manager_test.go index d1712e6d9e..4567e80aa1 100644 --- a/internal/datacoord/compaction_view_manager_test.go +++ b/internal/datacoord/compaction_view_manager_test.go @@ -329,7 +329,8 @@ func genTestDeltalogs(logCount int, logSize int64) []*datapb.FieldBinlog { for i := 0; i < logCount; i++ { binlog := &datapb.Binlog{ - LogSize: logSize, + LogSize: logSize, + MemorySize: logSize, } binlogs = append(binlogs, binlog) } diff --git a/internal/datacoord/segment_info.go b/internal/datacoord/segment_info.go index 4ab3d16819..5946934e88 100644 --- a/internal/datacoord/segment_info.go +++ b/internal/datacoord/segment_info.go @@ -414,19 +414,19 @@ func (s *SegmentInfo) getSegmentSize() int64 { var size int64 for _, binlogs := range s.GetBinlogs() { for _, l := range binlogs.GetBinlogs() { - size += l.GetLogSize() + size += l.GetMemorySize() } } for _, deltaLogs := range s.GetDeltalogs() { for _, l := range deltaLogs.GetBinlogs() { - size += l.GetLogSize() + size += l.GetMemorySize() } } for _, statsLogs := range s.GetStatslogs() { for _, l := range statsLogs.GetBinlogs() { - size += l.GetLogSize() + size += l.GetMemorySize() } } if size > 0 { diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go index e94da52c93..f96a67008e 100644 --- a/internal/datacoord/util.go +++ b/internal/datacoord/util.go @@ -148,19 +148,19 @@ func getCompactedSegmentSize(s *datapb.CompactionSegment) int64 { if s != nil { for _, binlogs := range s.GetInsertLogs() { for _, l := range binlogs.GetBinlogs() { - segmentSize += l.GetLogSize() + segmentSize += l.GetMemorySize() } } for _, deltaLogs := range s.GetDeltalogs() { for _, l := range deltaLogs.GetBinlogs() { - segmentSize += l.GetLogSize() + segmentSize += l.GetMemorySize() } } - for _, statsLogs := range s.GetDeltalogs() { + for _, statsLogs := range s.GetField2StatslogPaths() { for _, l := range statsLogs.GetBinlogs() { - segmentSize += l.GetLogSize() + segmentSize += l.GetMemorySize() } } } @@ -232,7 +232,7 @@ func calculateL0SegmentSize(fields []*datapb.FieldBinlog) float64 { size := int64(0) for _, field := range fields { for _, binlog := range field.GetBinlogs() { - size += binlog.GetLogSize() + size += binlog.GetMemorySize() } } return float64(size) diff --git a/internal/datacoord/util_test.go b/internal/datacoord/util_test.go index 1b05364494..c8a48c0cfa 100644 --- a/internal/datacoord/util_test.go +++ b/internal/datacoord/util_test.go @@ -194,7 +194,7 @@ func (suite *UtilSuite) TestCalculateL0SegmentSize() { logsize := int64(100) fields := []*datapb.FieldBinlog{{ FieldID: 102, - Binlogs: []*datapb.Binlog{{LogSize: logsize}}, + Binlogs: []*datapb.Binlog{{LogSize: logsize, MemorySize: logsize}}, }} suite.Equal(calculateL0SegmentSize(fields), float64(logsize)) diff --git a/internal/datanode/binlog_io.go b/internal/datanode/binlog_io.go index a351f0f260..c6ff5425f6 100644 --- a/internal/datanode/binlog_io.go +++ b/internal/datanode/binlog_io.go @@ -106,7 +106,7 @@ func genInsertBlobs(b io.BinlogIO, allocator allocator.Allocator, data *InsertDa kvs[key] = value inpaths[fID] = &datapb.FieldBinlog{ FieldID: fID, - Binlogs: []*datapb.Binlog{{LogSize: int64(fileLen), LogPath: key, EntriesNum: blob.RowNum}}, + Binlogs: []*datapb.Binlog{{LogSize: int64(fileLen), LogPath: key, EntriesNum: blob.RowNum, MemorySize: blob.GetMemorySize()}}, } } @@ -135,7 +135,7 @@ func genStatBlobs(b io.BinlogIO, allocator allocator.Allocator, stats *storage.P statPaths[fID] = &datapb.FieldBinlog{ FieldID: fID, - Binlogs: []*datapb.Binlog{{LogSize: int64(fileLen), LogPath: key, EntriesNum: totRows}}, + Binlogs: []*datapb.Binlog{{LogSize: int64(fileLen), LogPath: key, EntriesNum: totRows, MemorySize: int64(fileLen)}}, } return statPaths, nil } @@ -238,6 +238,7 @@ func uploadDeltaLog( EntriesNum: dData.RowCount, LogPath: k, LogSize: int64(len(v)), + MemorySize: dData.Size(), }}, }) } else { diff --git a/internal/datanode/l0_compactor.go b/internal/datanode/l0_compactor.go index 5031cc775f..b211c78749 100644 --- a/internal/datanode/l0_compactor.go +++ b/internal/datanode/l0_compactor.go @@ -154,7 +154,7 @@ func (t *levelZeroCompactionTask) compact() (*datapb.CompactionPlanResult, error for _, d := range s.GetDeltalogs() { for _, l := range d.GetBinlogs() { paths = append(paths, l.GetLogPath()) - totalSize += l.GetLogSize() + totalSize += l.GetMemorySize() } } if len(paths) > 0 { @@ -349,6 +349,7 @@ func (t *levelZeroCompactionTask) composeDeltalog(segmentID int64, dData *storag LogID: logID, TimestampFrom: minTs, TimestampTo: maxTs, + MemorySize: dData.Size(), } return uploadKv, deltalog, nil diff --git a/internal/datanode/syncmgr/task.go b/internal/datanode/syncmgr/task.go index efe7367ffc..a2a6af3233 100644 --- a/internal/datanode/syncmgr/task.go +++ b/internal/datanode/syncmgr/task.go @@ -259,7 +259,8 @@ func (t *SyncTask) processInsertBlobs() { TimestampFrom: t.tsFrom, TimestampTo: t.tsTo, LogPath: key, - LogSize: t.binlogMemsize[fieldID], + LogSize: int64(len(blob.GetValue())), + MemorySize: t.binlogMemsize[fieldID], }) } } @@ -288,6 +289,7 @@ func (t *SyncTask) processDeltaBlob() { data.TimestampFrom = t.tsFrom data.TimestampTo = t.tsTo data.EntriesNum = t.deltaRowCount + data.MemorySize = t.deltaBlob.GetMemorySize() t.appendDeltalog(data) } } @@ -304,6 +306,7 @@ func (t *SyncTask) convertBlob2StatsBinlog(blob *storage.Blob, fieldID, logID in TimestampTo: t.tsTo, LogPath: key, LogSize: int64(len(value)), + MemorySize: int64(len(value)), }) } diff --git a/internal/metastore/kv/datacoord/kv_catalog.go b/internal/metastore/kv/datacoord/kv_catalog.go index eaa33ba18f..a7cae134c9 100644 --- a/internal/metastore/kv/datacoord/kv_catalog.go +++ b/internal/metastore/kv/datacoord/kv_catalog.go @@ -193,6 +193,13 @@ func (kc *Catalog) listBinlogs(binlogType storage.BinlogType) (map[typeutil.Uniq return fmt.Errorf("prefix:%s, %w", path.Join(kc.metaRootpath, logPathPrefix), err) } + // set log size to memory size if memory size is zero for old segment before v2.4.3 + for i, b := range fieldBinlog.GetBinlogs() { + if b.GetMemorySize() == 0 { + fieldBinlog.Binlogs[i].MemorySize = b.GetLogSize() + } + } + // no need to set log path and only store log id ret[segmentID] = append(ret[segmentID], fieldBinlog) return nil diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index b8a14bc4df..da8a36597a 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -405,6 +405,10 @@ message Binlog { string log_path = 4; int64 log_size = 5; int64 logID = 6; + // memory_size represents the size occupied by loading data into memory. + // log_size represents the size after data serialized. + // for stats_log, the memory_size always equal log_size. + int64 memory_size = 7; } message GetRecoveryInfoResponse { diff --git a/internal/querycoordv2/meta/coordinator_broker.go b/internal/querycoordv2/meta/coordinator_broker.go index 40c5db8962..cbcb9fced7 100644 --- a/internal/querycoordv2/meta/coordinator_broker.go +++ b/internal/querycoordv2/meta/coordinator_broker.go @@ -124,6 +124,22 @@ func (broker *CoordinatorBroker) GetRecoveryInfo(ctx context.Context, collection return nil, nil, err } + // fallback binlog memory size to log size when it is zero + fallbackBinlogMemorySize := func(binlogs []*datapb.FieldBinlog) { + for _, insertBinlogs := range binlogs { + for _, b := range insertBinlogs.GetBinlogs() { + if b.GetMemorySize() == 0 { + b.MemorySize = b.GetLogSize() + } + } + } + } + for _, segBinlogs := range recoveryInfo.GetBinlogs() { + fallbackBinlogMemorySize(segBinlogs.GetFieldBinlogs()) + fallbackBinlogMemorySize(segBinlogs.GetStatslogs()) + fallbackBinlogMemorySize(segBinlogs.GetDeltalogs()) + } + return recoveryInfo.Channels, recoveryInfo.Binlogs, nil } diff --git a/internal/querynodev2/segments/index_attr_cache.go b/internal/querynodev2/segments/index_attr_cache.go index 279259f146..73f1cfbe9f 100644 --- a/internal/querynodev2/segments/index_attr_cache.go +++ b/internal/querynodev2/segments/index_attr_cache.go @@ -68,7 +68,7 @@ func (c *IndexAttrCache) GetIndexResourceUsage(indexInfo *querypb.FieldIndexInfo if indexType == indexparamcheck.IndexINVERTED { neededMemSize := 0 // we will mmap the binlog if the index type is inverted index. - neededDiskSize := indexInfo.IndexSize + getBinlogDataSize(fieldBinlog) + neededDiskSize := indexInfo.IndexSize + getBinlogDataDiskSize(fieldBinlog) return uint64(neededMemSize), uint64(neededDiskSize), nil } diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 7d6005d64b..c9a0df822e 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -1422,7 +1422,7 @@ func (s *LocalSegment) UpdateFieldRawDataSize(ctx context.Context, numRows int64 fieldID := fieldBinlog.FieldID fieldDataSize := int64(0) for _, binlog := range fieldBinlog.GetBinlogs() { - fieldDataSize += binlog.LogSize + fieldDataSize += binlog.GetMemorySize() } GetDynamicPool().Submit(func() (any, error) { status = C.UpdateFieldRawDataSize(s.ptr, C.int64_t(fieldID), C.int64_t(numRows), C.int64_t(fieldDataSize)) diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 4256fd4529..7c6f83dda7 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -1612,10 +1612,10 @@ func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadIn } else { mmapEnabled = common.IsFieldMmapEnabled(schema, fieldID) || (!common.FieldHasMmapKey(schema, fieldID) && params.Params.QueryNodeCfg.MmapEnabled.GetAsBool()) - binlogSize := uint64(getBinlogDataSize(fieldBinlog)) + binlogSize := uint64(getBinlogDataMemorySize(fieldBinlog)) segmentMemorySize += binlogSize if mmapEnabled { - segmentDiskSize += binlogSize + segmentDiskSize += uint64(getBinlogDataDiskSize(fieldBinlog)) } else { if multiplyFactor.enableTempSegmentIndex { segmentMemorySize += uint64(float64(binlogSize) * multiplyFactor.tempSegmentIndexFactor) @@ -1629,7 +1629,7 @@ func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadIn // get size of stats data for _, fieldBinlog := range loadInfo.Statslogs { - segmentMemorySize += uint64(getBinlogDataSize(fieldBinlog)) + segmentMemorySize += uint64(getBinlogDataMemorySize(fieldBinlog)) } // binlog & statslog use general load factor @@ -1637,7 +1637,7 @@ func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadIn // get size of delete data for _, fieldBinlog := range loadInfo.Deltalogs { - segmentMemorySize += uint64(float64(getBinlogDataSize(fieldBinlog)) * multiplyFactor.deltaDataExpansionFactor) + segmentMemorySize += uint64(float64(getBinlogDataMemorySize(fieldBinlog)) * multiplyFactor.deltaDataExpansionFactor) } return &ResourceUsage{ MemorySize: segmentMemorySize, @@ -1725,10 +1725,19 @@ func (loader *segmentLoader) LoadIndex(ctx context.Context, segment *LocalSegmen return loader.waitSegmentLoadDone(ctx, commonpb.SegmentState_SegmentStateNone, []int64{loadInfo.GetSegmentID()}, version) } -func getBinlogDataSize(fieldBinlog *datapb.FieldBinlog) int64 { +func getBinlogDataDiskSize(fieldBinlog *datapb.FieldBinlog) int64 { fieldSize := int64(0) for _, binlog := range fieldBinlog.Binlogs { - fieldSize += binlog.LogSize + fieldSize += binlog.GetLogSize() + } + + return fieldSize +} + +func getBinlogDataMemorySize(fieldBinlog *datapb.FieldBinlog) int64 { + fieldSize := int64(0) + for _, binlog := range fieldBinlog.Binlogs { + fieldSize += binlog.GetMemorySize() } return fieldSize diff --git a/internal/querynodev2/segments/segment_loader_test.go b/internal/querynodev2/segments/segment_loader_test.go index c163ae2352..138fed79b7 100644 --- a/internal/querynodev2/segments/segment_loader_test.go +++ b/internal/querynodev2/segments/segment_loader_test.go @@ -517,7 +517,8 @@ func (suite *SegmentLoaderSuite) TestLoadIndexWithLimitedResource() { FieldID: 1, Binlogs: []*datapb.Binlog{ { - LogSize: 1000000000, + LogSize: 1000000000, + MemorySize: 1000000000, }, }, }, @@ -860,8 +861,8 @@ func (suite *SegmentLoaderDetailSuite) TestRequestResource() { Deltalogs: []*datapb.FieldBinlog{ { Binlogs: []*datapb.Binlog{ - {LogSize: 10000}, - {LogSize: 12000}, + {LogSize: 10000, MemorySize: 10000}, + {LogSize: 12000, MemorySize: 12000}, }, }, }, diff --git a/internal/querynodev2/segments/segment_test.go b/internal/querynodev2/segments/segment_test.go index 0d278a9ccd..c05de4c83d 100644 --- a/internal/querynodev2/segments/segment_test.go +++ b/internal/querynodev2/segments/segment_test.go @@ -78,7 +78,8 @@ func (suite *SegmentSuite) SetupTest() { FieldID: 101, Binlogs: []*datapb.Binlog{ { - LogSize: 10086, + LogSize: 10086, + MemorySize: 10086, }, }, }, diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 686f498552..c5b58cca05 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -422,6 +422,22 @@ func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmen return merr.Status(err), nil } + // fallback binlog memory size to log size when it is zero + fallbackBinlogMemorySize := func(binlogs []*datapb.FieldBinlog) { + for _, insertBinlogs := range binlogs { + for _, b := range insertBinlogs.GetBinlogs() { + if b.GetMemorySize() == 0 { + b.MemorySize = b.GetLogSize() + } + } + } + } + for _, s := range req.GetInfos() { + fallbackBinlogMemorySize(s.GetBinlogPaths()) + fallbackBinlogMemorySize(s.GetStatslogs()) + fallbackBinlogMemorySize(s.GetDeltalogs()) + } + // Delegates request to workers if req.GetNeedTransfer() { delegator, ok := node.delegators.Get(segment.GetInsertChannel()) diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index b44e320c1d..61babb9227 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -68,10 +68,10 @@ const InvalidUniqueID = UniqueID(-1) // Blob is a pack of key&value type Blob struct { - Key string - Value []byte - Size int64 - RowNum int64 + Key string + Value []byte + MemorySize int64 + RowNum int64 } // BlobList implements sort.Interface for a list of Blob @@ -110,6 +110,11 @@ func (b Blob) GetValue() []byte { return b.Value } +// GetMemorySize returns the memory size of blob +func (b Blob) GetMemorySize() int64 { + return b.MemorySize +} + // InsertCodec serializes and deserializes the insert data // Blob key example: // ${tenant}/insert_log/${collection_id}/${partition_id}/${segment_id}/${field_id}/${log_idx} @@ -418,9 +423,10 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique } blobKey := fmt.Sprintf("%d", field.FieldID) blobs = append(blobs, &Blob{ - Key: blobKey, - Value: buffer, - RowNum: rowNum, + Key: blobKey, + Value: buffer, + RowNum: rowNum, + MemorySize: int64(singleData.GetMemorySize()), }) eventWriter.Close() writer.Close() @@ -1018,7 +1024,8 @@ func (deleteCodec *DeleteCodec) Serialize(collectionID UniqueID, partitionID Uni return nil, err } blob := &Blob{ - Value: buffer, + Value: buffer, + MemorySize: data.Size(), } return blob, nil } diff --git a/internal/storage/index_data_codec_test.go b/internal/storage/index_data_codec_test.go index 170dc80034..c1759ee970 100644 --- a/internal/storage/index_data_codec_test.go +++ b/internal/storage/index_data_codec_test.go @@ -120,19 +120,19 @@ func TestIndexCodec(t *testing.T) { indexCodec := NewIndexCodec() blobs := []*Blob{ { - Key: "12345", - Value: []byte{1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7}, - Size: 14, + Key: "12345", + Value: []byte{1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7}, + MemorySize: 14, }, { - Key: "6666", - Value: []byte{6, 6, 6, 6, 6, 1, 2, 3, 4, 5, 6, 7}, - Size: 12, + Key: "6666", + Value: []byte{6, 6, 6, 6, 6, 1, 2, 3, 4, 5, 6, 7}, + MemorySize: 12, }, { - Key: "8885", - Value: []byte{8, 8, 8, 8, 8, 8, 8, 8, 2, 3, 4, 5, 6, 7}, - Size: 14, + Key: "8885", + Value: []byte{8, 8, 8, 8, 8, 8, 8, 8, 2, 3, 4, 5, 6, 7}, + MemorySize: 14, }, } indexParams := map[string]string{ diff --git a/internal/util/indexcgowrapper/index.go b/internal/util/indexcgowrapper/index.go index 9ed3ae73f2..f0850b3b91 100644 --- a/internal/util/indexcgowrapper/index.go +++ b/internal/util/indexcgowrapper/index.go @@ -284,9 +284,9 @@ func (index *CgoIndex) Serialize() ([]*Blob, error) { return nil, err } blob := &Blob{ - Key: key, - Value: value, - Size: size, + Key: key, + Value: value, + MemorySize: size, } ret = append(ret, blob) }