mirror of https://github.com/milvus-io/milvus.git
enhance: reduce SyncTask AllocID call and refine code (#29701)
See also #27675 `Allocator.Alloc` and `Allocator.AllocOne` might be invoked multiple times if there were multiple blobs set in one sync task. This PR add pre-fetch logic for all blobs and cache logIDs in sync task so that at most only one call of the allocator is needed. --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/29686/head
parent
87d373bf34
commit
dc6a6a50fa
|
@ -80,6 +80,9 @@ type SyncTask struct {
|
|||
deltaBlob *storage.Blob
|
||||
deltaRowCount int64
|
||||
|
||||
// prefetched log ids
|
||||
ids []int64
|
||||
|
||||
segmentData map[string][]byte
|
||||
|
||||
writeRetryOpts []retry.Option
|
||||
|
@ -141,27 +144,15 @@ func (t *SyncTask) Run() (err error) {
|
|||
t.segmentID = t.segment.CompactTo()
|
||||
}
|
||||
|
||||
err = t.processInsertBlobs()
|
||||
err = t.prefetchIDs()
|
||||
if err != nil {
|
||||
log.Warn("failed to process insert blobs", zap.Error(err))
|
||||
log.Warn("failed allocate ids for sync task", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
err = t.processStatsBlob()
|
||||
if err != nil {
|
||||
log.Warn("failed to serialize insert data", zap.Error(err))
|
||||
t.handleError(err, metricSegLevel)
|
||||
log.Warn("failed to process stats blobs", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
err = t.processDeltaBlob()
|
||||
if err != nil {
|
||||
log.Warn("failed to serialize delete data", zap.Error(err))
|
||||
t.handleError(err, metricSegLevel)
|
||||
log.Warn("failed to process delta blobs", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
t.processInsertBlobs()
|
||||
t.processStatsBlob()
|
||||
t.processDeltaBlob()
|
||||
|
||||
err = t.writeLogs()
|
||||
if err != nil {
|
||||
|
@ -210,18 +201,35 @@ func (t *SyncTask) Run() (err error) {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (t *SyncTask) processInsertBlobs() error {
|
||||
if len(t.binlogBlobs) == 0 {
|
||||
return nil
|
||||
// prefetchIDs pre-allcates ids depending on the number of blobs current task contains.
|
||||
func (t *SyncTask) prefetchIDs() error {
|
||||
totalIDCount := len(t.binlogBlobs)
|
||||
if t.batchStatsBlob != nil {
|
||||
totalIDCount++
|
||||
}
|
||||
|
||||
logidx, _, err := t.allocator.Alloc(uint32(len(t.binlogBlobs)))
|
||||
if t.deltaBlob != nil {
|
||||
totalIDCount++
|
||||
}
|
||||
start, _, err := t.allocator.Alloc(uint32(totalIDCount))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t.ids = lo.RangeFrom(start, totalIDCount)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *SyncTask) nextID() int64 {
|
||||
if len(t.ids) == 0 {
|
||||
panic("pre-fetched ids exhausted")
|
||||
}
|
||||
r := t.ids[0]
|
||||
t.ids = t.ids[1:]
|
||||
return r
|
||||
}
|
||||
|
||||
func (t *SyncTask) processInsertBlobs() {
|
||||
for fieldID, blob := range t.binlogBlobs {
|
||||
k := metautil.JoinIDPath(t.collectionID, t.partitionID, t.segmentID, fieldID, logidx)
|
||||
k := metautil.JoinIDPath(t.collectionID, t.partitionID, t.segmentID, fieldID, t.nextID())
|
||||
key := path.Join(t.chunkManager.RootPath(), common.SegmentInsertLogPath, k)
|
||||
t.segmentData[key] = blob.GetValue()
|
||||
t.appendBinlog(fieldID, &datapb.Binlog{
|
||||
|
@ -231,38 +239,25 @@ func (t *SyncTask) processInsertBlobs() error {
|
|||
LogPath: key,
|
||||
LogSize: t.binlogMemsize[fieldID],
|
||||
})
|
||||
logidx++
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *SyncTask) processStatsBlob() error {
|
||||
func (t *SyncTask) processStatsBlob() {
|
||||
if t.batchStatsBlob != nil {
|
||||
logidx, err := t.allocator.AllocOne()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t.convertBlob2StatsBinlog(t.batchStatsBlob, t.pkField.GetFieldID(), logidx, t.batchSize)
|
||||
t.convertBlob2StatsBinlog(t.batchStatsBlob, t.pkField.GetFieldID(), t.nextID(), t.batchSize)
|
||||
}
|
||||
if t.mergedStatsBlob != nil {
|
||||
totalRowNum := t.segment.NumOfRows()
|
||||
t.convertBlob2StatsBinlog(t.mergedStatsBlob, t.pkField.GetFieldID(), int64(storage.CompoundStatsType), totalRowNum)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *SyncTask) processDeltaBlob() error {
|
||||
func (t *SyncTask) processDeltaBlob() {
|
||||
if t.deltaBlob != nil {
|
||||
logID, err := t.allocator.AllocOne()
|
||||
if err != nil {
|
||||
log.Error("failed to alloc ID", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
value := t.deltaBlob.GetValue()
|
||||
data := &datapb.Binlog{}
|
||||
|
||||
blobKey := metautil.JoinIDPath(t.collectionID, t.partitionID, t.segmentID, logID)
|
||||
blobKey := metautil.JoinIDPath(t.collectionID, t.partitionID, t.segmentID, t.nextID())
|
||||
blobPath := path.Join(t.chunkManager.RootPath(), common.SegmentDeltaLogPath, blobKey)
|
||||
|
||||
t.segmentData[blobPath] = value
|
||||
|
@ -273,7 +268,6 @@ func (t *SyncTask) processDeltaBlob() error {
|
|||
data.EntriesNum = t.deltaRowCount
|
||||
t.appendDeltalog(data)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *SyncTask) convertBlob2StatsBinlog(blob *storage.Blob, fieldID, logID int64, rowNum int64) {
|
||||
|
|
|
@ -158,6 +158,7 @@ func (s *SyncTaskSuite) getSuiteSyncTask() *SyncTask {
|
|||
WithChunkManager(s.chunkManager).
|
||||
WithAllocator(s.allocator).
|
||||
WithMetaCache(s.metacache)
|
||||
task.binlogMemsize = map[int64]int64{0: 1, 1: 1, 100: 100}
|
||||
|
||||
return task
|
||||
}
|
||||
|
@ -345,6 +346,17 @@ func (s *SyncTaskSuite) TestRunError() {
|
|||
s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true)
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
|
||||
s.Run("allocate_id_fail", func() {
|
||||
mockAllocator := allocator.NewMockAllocator(s.T())
|
||||
mockAllocator.EXPECT().Alloc(mock.Anything).Return(0, 0, errors.New("mocked"))
|
||||
|
||||
task := s.getSuiteSyncTask()
|
||||
task.allocator = mockAllocator
|
||||
|
||||
err := task.Run()
|
||||
s.Error(err)
|
||||
})
|
||||
|
||||
s.Run("metawrite_fail", func() {
|
||||
s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(errors.New("mocked"))
|
||||
|
||||
|
@ -382,6 +394,21 @@ func (s *SyncTaskSuite) TestRunError() {
|
|||
})
|
||||
}
|
||||
|
||||
func (s *SyncTaskSuite) TestNextID() {
|
||||
task := s.getSuiteSyncTask()
|
||||
|
||||
task.ids = []int64{0}
|
||||
s.Run("normal_next", func() {
|
||||
id := task.nextID()
|
||||
s.EqualValues(0, id)
|
||||
})
|
||||
s.Run("id_exhausted", func() {
|
||||
s.Panics(func() {
|
||||
task.nextID()
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestSyncTask(t *testing.T) {
|
||||
suite.Run(t, new(SyncTaskSuite))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue