avoid add empty growing segment to delegator distribution (#27930)

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/27941/head
wei liu 2023-10-26 10:10:10 +08:00 committed by GitHub
parent 7616414083
commit 8041fc3c75
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 22 additions and 23 deletions

View File

@ -76,28 +76,6 @@ func (d *DeleteData) Append(ad DeleteData) {
d.RowCount += ad.RowCount
}
func (sd *shardDelegator) newGrowing(segmentID int64, insertData *InsertData) segments.Segment {
log := sd.getLogger(context.Background()).With(zap.Int64("segmentID", segmentID))
segment, err := segments.NewSegment(sd.collection, segmentID, insertData.PartitionID, sd.collectionID, sd.vchannelName, segments.SegmentTypeGrowing, 0, insertData.StartPosition, insertData.StartPosition)
if err != nil {
log.Error("failed to create new segment", zap.Error(err))
panic(err)
}
sd.pkOracle.Register(segment, paramtable.GetNodeID())
sd.segmentManager.Put(segments.SegmentTypeGrowing, segment)
sd.addGrowing(SegmentEntry{
NodeID: paramtable.GetNodeID(),
SegmentID: segmentID,
PartitionID: insertData.PartitionID,
Version: 0,
TargetVersion: initialTargetVersion,
})
return segment
}
// ProcessInsert handles insert data in delegator.
func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) {
method := "ProcessInsert"
@ -106,7 +84,15 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) {
for segmentID, insertData := range insertRecords {
growing := sd.segmentManager.GetGrowing(segmentID)
if growing == nil {
growing = sd.newGrowing(segmentID, insertData)
var err error
growing, err = segments.NewSegment(sd.collection, segmentID, insertData.PartitionID, sd.collectionID, sd.vchannelName,
segments.SegmentTypeGrowing, 0, insertData.StartPosition, insertData.StartPosition)
if err != nil {
log.Error("failed to create new segment",
zap.Int64("segmentID", segmentID),
zap.Error(err))
panic(err)
}
}
err := growing.Insert(insertData.RowIDs, insertData.Timestamps, insertData.InsertRecord)
@ -124,6 +110,19 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) {
}
growing.UpdateBloomFilter(insertData.PrimaryKeys)
if !sd.pkOracle.Exists(growing, paramtable.GetNodeID()) {
// register created growing segment after insert, avoid to add empty growing to delegator
sd.pkOracle.Register(growing, paramtable.GetNodeID())
sd.segmentManager.Put(segments.SegmentTypeGrowing, growing)
sd.addGrowing(SegmentEntry{
NodeID: paramtable.GetNodeID(),
SegmentID: segmentID,
PartitionID: insertData.PartitionID,
Version: 0,
TargetVersion: initialTargetVersion,
})
}
log.Debug("insert into growing segment",
zap.Int64("collectionID", growing.Collection()),
zap.Int64("segmentID", segmentID),