mirror of https://github.com/milvus-io/milvus.git
fix: Fix the issue of concurrent packing of the same segment (#34840)
issue: #34703 Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>pull/34913/head
parent
12b49859c0
commit
643b9d521c
|
@ -307,7 +307,7 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e
|
|||
uploadedSegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats, 0),
|
||||
clusteringKeyFieldStats: fieldStats,
|
||||
}
|
||||
if _, err = t.refreshBufferWriter(buffer); err != nil {
|
||||
if _, err = t.refreshBufferWriterWithPack(buffer); err != nil {
|
||||
return err
|
||||
}
|
||||
t.clusterBuffers = append(t.clusterBuffers, buffer)
|
||||
|
@ -361,7 +361,7 @@ func (t *clusteringCompactionTask) getVectorAnalyzeResult(ctx context.Context) e
|
|||
uploadedSegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats, 0),
|
||||
clusteringKeyFieldStats: fieldStats,
|
||||
}
|
||||
if _, err = t.refreshBufferWriter(clusterBuffer); err != nil {
|
||||
if _, err = t.refreshBufferWriterWithPack(clusterBuffer); err != nil {
|
||||
return err
|
||||
}
|
||||
t.clusterBuffers = append(t.clusterBuffers, clusterBuffer)
|
||||
|
@ -414,6 +414,10 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context,
|
|||
return nil, nil, err
|
||||
}
|
||||
|
||||
if err := t.checkBuffersAfterCompaction(); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
resultSegments := make([]*datapb.CompactionSegment, 0)
|
||||
resultPartitionStats := &storage.PartitionStatsSnapshot{
|
||||
SegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats),
|
||||
|
@ -603,7 +607,7 @@ func (t *clusteringCompactionTask) mappingSegment(
|
|||
// reach segment/binlog max size
|
||||
t.clusterBufferLocks.Lock(clusterBuffer.id)
|
||||
writer := clusterBuffer.writer
|
||||
pack, _ := t.refreshBufferWriter(clusterBuffer)
|
||||
pack, _ := t.refreshBufferWriterWithPack(clusterBuffer)
|
||||
log.Debug("buffer need to flush", zap.Int("bufferID", clusterBuffer.id),
|
||||
zap.Bool("pack", pack),
|
||||
zap.Int64("current segment", writer.GetSegmentID()),
|
||||
|
@ -763,7 +767,10 @@ func (t *clusteringCompactionTask) flushLargestBuffers(ctx context.Context) erro
|
|||
buffer := t.clusterBuffers[bufferId]
|
||||
writer := buffer.writer
|
||||
currentMemorySize -= int64(writer.WrittenMemorySize())
|
||||
pack, _ := t.refreshBufferWriter(buffer)
|
||||
if err := t.refreshBufferWriter(buffer); err != nil {
|
||||
t.clusterBufferLocks.Unlock(bufferId)
|
||||
return err
|
||||
}
|
||||
t.clusterBufferLocks.Unlock(bufferId)
|
||||
|
||||
log.Info("currentMemorySize after flush buffer binlog",
|
||||
|
@ -772,7 +779,7 @@ func (t *clusteringCompactionTask) flushLargestBuffers(ctx context.Context) erro
|
|||
zap.Uint64("WrittenMemorySize()", writer.WrittenMemorySize()),
|
||||
zap.Int64("RowNum", writer.GetRowNum()))
|
||||
future := t.flushPool.Submit(func() (any, error) {
|
||||
err := t.flushBinlog(ctx, buffer, writer, pack)
|
||||
err := t.flushBinlog(ctx, buffer, writer, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -856,7 +863,7 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff
|
|||
zap.Int64("row num", seg.GetNumOfRows()))
|
||||
|
||||
// clear segment binlogs cache
|
||||
buffer.flushedBinlogs[writer.GetSegmentID()] = nil
|
||||
delete(buffer.flushedBinlogs, writer.GetSegmentID())
|
||||
//set old writer nil
|
||||
writer = nil
|
||||
return nil
|
||||
|
@ -1153,7 +1160,7 @@ func (t *clusteringCompactionTask) scalarPlan(dict map[interface{}]int64) [][]in
|
|||
return buckets
|
||||
}
|
||||
|
||||
func (t *clusteringCompactionTask) refreshBufferWriter(buffer *ClusterBuffer) (bool, error) {
|
||||
func (t *clusteringCompactionTask) refreshBufferWriterWithPack(buffer *ClusterBuffer) (bool, error) {
|
||||
var segmentID int64
|
||||
var err error
|
||||
var pack bool
|
||||
|
@ -1179,6 +1186,33 @@ func (t *clusteringCompactionTask) refreshBufferWriter(buffer *ClusterBuffer) (b
|
|||
return pack, nil
|
||||
}
|
||||
|
||||
func (t *clusteringCompactionTask) refreshBufferWriter(buffer *ClusterBuffer) error {
|
||||
var segmentID int64
|
||||
var err error
|
||||
segmentID = buffer.writer.GetSegmentID()
|
||||
buffer.bufferMemorySize.Add(int64(buffer.writer.WrittenMemorySize()))
|
||||
|
||||
writer, err := NewSegmentWriter(t.plan.GetSchema(), t.plan.MaxSegmentRows, segmentID, t.partitionID, t.collectionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
buffer.writer = writer
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *clusteringCompactionTask) GetSlotUsage() int64 {
|
||||
return t.plan.GetSlotUsage()
|
||||
}
|
||||
|
||||
func (t *clusteringCompactionTask) checkBuffersAfterCompaction() error {
|
||||
for _, buffer := range t.clusterBuffers {
|
||||
if len(buffer.flushedBinlogs) != 0 {
|
||||
log.Warn("there are some binlogs have leaked, please check", zap.Int("buffer id", buffer.id),
|
||||
zap.Int64s("leak segments", lo.Keys(buffer.flushedBinlogs)))
|
||||
log.Debug("leak binlogs", zap.Any("buffer flushedBinlogs", buffer.flushedBinlogs))
|
||||
return fmt.Errorf("there are some binlogs have leaked")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue