Cache segment row num, size, and insert count to reduce CGO calls (#28007)

Signed-off-by: yah01 <yah2er0ne@outlook.com>
pull/28026/head
yah01 2023-10-30 17:54:14 +08:00 committed by GitHub
parent ab6b0103a3
commit 9b6eeb46f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 42 additions and 20 deletions

View File

@ -144,8 +144,11 @@ type LocalSegment struct {
ptrLock sync.RWMutex // protects segmentPtr
ptr C.CSegmentInterface
size int64
row int64
// cached results, to avoid too many CGO calls
memSize *atomic.Int64
rowNum *atomic.Int64
insertCount *atomic.Int64
lastDeltaTimestamp *atomic.Uint64
fieldIndexes *typeutil.ConcurrentMap[int64, *IndexedFieldInfo]
}
@ -185,6 +188,10 @@ func NewSegment(collection *Collection,
ptr: segmentPtr,
lastDeltaTimestamp: atomic.NewUint64(0),
fieldIndexes: typeutil.NewConcurrentMap[int64, *IndexedFieldInfo](),
memSize: atomic.NewInt64(-1),
rowNum: atomic.NewInt64(-1),
insertCount: atomic.NewInt64(0),
}
return segment, nil
@ -217,13 +224,8 @@ func (s *LocalSegment) InsertCount() int64 {
if !s.isValid() {
return 0
}
var rowCount C.int64_t
GetDynamicPool().Submit(func() (any, error) {
rowCount = C.GetRowCount(s.ptr)
return nil, nil
}).Await()
return int64(rowCount)
return s.insertCount.Load()
}
func (s *LocalSegment) RowNum() int64 {
@ -233,13 +235,19 @@ func (s *LocalSegment) RowNum() int64 {
if !s.isValid() {
return 0
}
var rowCount C.int64_t
GetDynamicPool().Submit(func() (any, error) {
rowCount = C.GetRealCount(s.ptr)
return nil, nil
}).Await()
return int64(rowCount)
rowNum := s.rowNum.Load()
if rowNum < 0 {
var rowCount C.int64_t
GetDynamicPool().Submit(func() (any, error) {
rowCount = C.GetRealCount(s.ptr)
s.rowNum.Store(int64(rowCount))
return nil, nil
}).Await()
rowNum = int64(rowCount)
}
return rowNum
}
func (s *LocalSegment) MemSize() int64 {
@ -249,13 +257,19 @@ func (s *LocalSegment) MemSize() int64 {
if !s.isValid() {
return 0
}
var memoryUsageInBytes C.int64_t
GetDynamicPool().Submit(func() (any, error) {
memoryUsageInBytes = C.GetMemoryUsageInBytes(s.ptr)
return nil, nil
}).Await()
return int64(memoryUsageInBytes)
memSize := s.memSize.Load()
if memSize < 0 {
var cMemSize C.int64_t
GetDynamicPool().Submit(func() (any, error) {
cMemSize = C.GetMemoryUsageInBytes(s.ptr)
s.memSize.Store(int64(cMemSize))
return nil, nil
}).Await()
memSize = int64(cMemSize)
}
return memSize
}
func (s *LocalSegment) LastDeltaTimestamp() uint64 {
@ -498,6 +512,10 @@ func (s *LocalSegment) Insert(rowIDs []int64, timestamps []typeutil.Timestamp, r
if err := HandleCStatus(&status, "Insert failed"); err != nil {
return err
}
s.insertCount.Add(int64(numOfRow))
s.rowNum.Store(-1)
s.memSize.Store(-1)
metrics.QueryNodeNumEntities.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
fmt.Sprint(s.collectionID),
@ -576,6 +594,7 @@ func (s *LocalSegment) Delete(primaryKeys []storage.PrimaryKey, timestamps []typ
return err
}
s.rowNum.Store(-1)
s.lastDeltaTimestamp.Store(timestamps[len(timestamps)-1])
return nil
@ -628,6 +647,7 @@ func (s *LocalSegment) LoadMultiFieldData(rowCount int64, fields []*datapb.Field
return err
}
s.insertCount.Store(rowCount)
log.Info("load mutil field done",
zap.Int64("row count", rowCount),
zap.Int64("segmentID", s.ID()))
@ -681,6 +701,7 @@ func (s *LocalSegment) LoadFieldData(fieldID int64, rowCount int64, field *datap
return err
}
s.insertCount.Store(rowCount)
log.Info("load field done")
return nil
@ -804,6 +825,7 @@ func (s *LocalSegment) LoadDeltaData(deltaData *storage.DeleteData) error {
return err
}
s.rowNum.Store(-1)
s.lastDeltaTimestamp.Store(tss[len(tss)-1])
log.Info("load deleted record done",