mirror of https://github.com/milvus-io/milvus.git
fix: Skip statslog generation flushing empty L0 segment (#28733)
See also #27675 When L0 segment contains only delta data, merged statslog shall be skiped when performing sync task --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/28741/head
parent
d344336a13
commit
8a9ab69369
|
@ -31,6 +31,7 @@ type SyncTask struct {
|
|||
insertData *storage.InsertData
|
||||
deleteData *storage.DeleteData
|
||||
|
||||
segment *metacache.SegmentInfo
|
||||
collectionID int64
|
||||
partitionID int64
|
||||
segmentID int64
|
||||
|
@ -81,24 +82,25 @@ func (t *SyncTask) handleError(err error) {
|
|||
func (t *SyncTask) Run() error {
|
||||
log := t.getLogger()
|
||||
var err error
|
||||
var has bool
|
||||
|
||||
segment, has := t.metacache.GetSegmentByID(t.segmentID)
|
||||
t.segment, has = t.metacache.GetSegmentByID(t.segmentID)
|
||||
if !has {
|
||||
log.Warn("failed to sync data, segment not found in metacache")
|
||||
t.handleError(err)
|
||||
return merr.WrapErrSegmentNotFound(t.segmentID)
|
||||
}
|
||||
|
||||
if segment.CompactTo() == metacache.NullSegment {
|
||||
if t.segment.CompactTo() == metacache.NullSegment {
|
||||
log.Info("segment compacted to zero-length segment, discard sync task")
|
||||
return nil
|
||||
}
|
||||
|
||||
if segment.CompactTo() > 0 {
|
||||
log.Info("syncing segment compacted, update segment id", zap.Int64("compactTo", segment.CompactTo()))
|
||||
if t.segment.CompactTo() > 0 {
|
||||
log.Info("syncing segment compacted, update segment id", zap.Int64("compactTo", t.segment.CompactTo()))
|
||||
// update sync task segment id
|
||||
// it's ok to use compactTo segmentID here, since there shall be no insert for compacted segment
|
||||
t.segmentID = segment.CompactTo()
|
||||
t.segmentID = t.segment.CompactTo()
|
||||
}
|
||||
|
||||
err = t.serializeInsertData()
|
||||
|
@ -322,7 +324,9 @@ func (t *SyncTask) serializePkStatsLog() error {
|
|||
}
|
||||
}
|
||||
|
||||
if t.isFlush {
|
||||
// skip statslog for empty segment
|
||||
// DO NOT use level check here since Level zero segment may contain insert data in the future
|
||||
if t.isFlush && t.segment.NumOfRows() > 0 {
|
||||
return t.serializeMergedPkStats(fieldID, pkField.GetDataType())
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -239,6 +239,31 @@ func (s *SyncTaskSuite) TestRunNormal() {
|
|||
})
|
||||
}
|
||||
|
||||
func (s *SyncTaskSuite) TestRunL0Segment() {
|
||||
s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil)
|
||||
bfs := metacache.NewBloomFilterSet()
|
||||
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{Level: datapb.SegmentLevel_L0}, bfs)
|
||||
s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true)
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
|
||||
|
||||
s.Run("pure_delete_l0_flush", func() {
|
||||
task := s.getSuiteSyncTask()
|
||||
task.WithDeleteData(s.getDeleteBuffer())
|
||||
task.WithTimeRange(50, 100)
|
||||
task.WithMetaWriter(BrokerMetaWriter(s.broker))
|
||||
task.WithCheckpoint(&msgpb.MsgPosition{
|
||||
ChannelName: s.channelName,
|
||||
MsgID: []byte{1, 2, 3, 4},
|
||||
Timestamp: 100,
|
||||
})
|
||||
task.WithFlush()
|
||||
|
||||
err := task.Run()
|
||||
s.NoError(err)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *SyncTaskSuite) TestCompactToNull() {
|
||||
bfs := metacache.NewBloomFilterSet()
|
||||
fd, err := storage.NewFieldData(schemapb.DataType_Int64, &schemapb.FieldSchema{
|
||||
|
|
|
@ -46,12 +46,25 @@ func (wb *l0WriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsg
|
|||
defer wb.mut.Unlock()
|
||||
|
||||
// process insert msgs
|
||||
_, err := wb.bufferInsert(insertMsgs, startPos, endPos)
|
||||
pkData, err := wb.bufferInsert(insertMsgs, startPos, endPos)
|
||||
if err != nil {
|
||||
log.Warn("failed to buffer insert data", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
// update pk oracle
|
||||
for segmentID, dataList := range pkData {
|
||||
segments := wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(segmentID))
|
||||
for _, segment := range segments {
|
||||
for _, fieldData := range dataList {
|
||||
err := segment.GetBloomFilterSet().UpdatePKRange(fieldData)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, msg := range deleteMsgs {
|
||||
l0SegmentID := wb.getL0SegmentID(msg.GetPartitionID(), startPos)
|
||||
pks := storage.ParseIDs2PrimaryKeys(msg.GetPrimaryKeys())
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/allocator"
|
||||
"github.com/milvus-io/milvus/internal/datanode/metacache"
|
||||
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
||||
|
@ -153,6 +154,8 @@ func (s *L0WriteBufferSuite) TestBufferData() {
|
|||
pks, msg := s.composeInsertMsg(1000, 10, 128)
|
||||
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
|
||||
|
||||
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false)
|
||||
s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
|
||||
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
|
||||
|
|
|
@ -28,6 +28,7 @@ import (
|
|||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/etcdpb"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
|
@ -144,7 +145,7 @@ func (insertCodec *InsertCodec) SerializePkStats(stats *PrimaryKeyStats, rowNum
|
|||
// Serialize Pk stats list to one blob
|
||||
func (insertCodec *InsertCodec) SerializePkStatsList(stats []*PrimaryKeyStats, rowNum int64) (*Blob, error) {
|
||||
if len(stats) == 0 {
|
||||
return nil, nil
|
||||
return nil, merr.WrapErrServiceInternal("shall not serialize zero length statslog list")
|
||||
}
|
||||
|
||||
blobKey := fmt.Sprintf("%d", stats[0].FieldID)
|
||||
|
|
|
@ -417,6 +417,9 @@ func TestInsertCodec(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
_, err = DeserializeStats([]*Blob{statsBlob2})
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, err = insertCodec.SerializePkStatsList([]*PrimaryKeyStats{}, 0)
|
||||
assert.Error(t, err, "SerializePkStatsList zero length pkstats list shall return error")
|
||||
}
|
||||
|
||||
func TestDeleteCodec(t *testing.T) {
|
||||
|
|
Loading…
Reference in New Issue