Fix compacted segment not dropped (#20128)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
pull/20137/head
Xiaofan 2022-10-27 17:15:32 +08:00 committed by GitHub
parent bee66631e3
commit 11efa0bb5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 128 additions and 64 deletions

View File

@ -255,24 +255,9 @@ func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.Compact
}
log.Info("handleCompactionResult: altering metastore after compaction")
if newSegment.GetNumOfRows() > 0 {
if err := c.meta.alterMetaStoreAfterCompaction(modInfos, newSegment.SegmentInfo); err != nil {
log.Warn("handleCompactionResult: fail to alter metastore after compaction", zap.Error(err))
return fmt.Errorf("fail to alter metastore after compaction, err=%w", err)
}
} else {
log.Warn("compaction produced an empty segment", zap.Int64("segmentID", newSegment.GetID()))
fakedSegment := &datapb.SegmentInfo{
ID: newSegment.GetID(),
CollectionID: newSegment.GetCollectionID(),
PartitionID: newSegment.GetPartitionID(),
CompactionFrom: newSegment.GetCompactionFrom(),
CreatedByCompaction: true,
IsFake: true,
}
if err := c.meta.AddFakedSegment(fakedSegment); err != nil {
return fmt.Errorf("fail to save fake segment after compaction, err=%w", err)
}
if err := c.meta.alterMetaStoreAfterCompaction(modInfos, newSegment.SegmentInfo); err != nil {
log.Warn("handleCompactionResult: fail to alter metastore after compaction", zap.Error(err))
return fmt.Errorf("fail to alter metastore after compaction, err=%w", err)
}
var nodeID = c.plans[plan.GetPlanID()].dataNodeID

View File

@ -471,6 +471,114 @@ func TestCompactionPlanHandler_completeCompaction(t *testing.T) {
assert.Equal(t, compactionResult.GetSegmentID(), segID)
assert.NoError(t, err)
})
t.Run("test empty result merge compaction task", func(t *testing.T) {
mockDataNode := &mocks.DataNode{}
mockDataNode.EXPECT().SyncSegments(mock.Anything, mock.Anything).Run(func(ctx context.Context, req *datapb.SyncSegmentsRequest) {}).Return(&commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil)
dataNodeID := UniqueID(111)
seg1 := &datapb.SegmentInfo{
ID: 1,
Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log1")},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log2")},
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log3")},
}
seg2 := &datapb.SegmentInfo{
ID: 2,
Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log4")},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log5")},
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log6")},
}
plan := &datapb.CompactionPlan{
PlanID: 1,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{
SegmentID: seg1.ID,
FieldBinlogs: seg1.GetBinlogs(),
Field2StatslogPaths: seg1.GetStatslogs(),
Deltalogs: seg1.GetDeltalogs(),
},
{
SegmentID: seg2.ID,
FieldBinlogs: seg2.GetBinlogs(),
Field2StatslogPaths: seg2.GetStatslogs(),
Deltalogs: seg2.GetDeltalogs(),
},
},
Type: datapb.CompactionType_MergeCompaction,
}
sessions := &SessionManager{
sessions: struct {
sync.RWMutex
data map[int64]*Session
}{
data: map[int64]*Session{
dataNodeID: {client: mockDataNode}},
},
}
task := &compactionTask{
triggerInfo: &compactionSignal{id: 1},
state: executing,
plan: plan,
dataNodeID: dataNodeID,
}
plans := map[int64]*compactionTask{1: task}
meta := &meta{
catalog: &datacoord.Catalog{Txn: memkv.NewMemoryKV()},
segments: &SegmentsInfo{
map[int64]*SegmentInfo{
seg1.ID: {SegmentInfo: seg1},
seg2.ID: {SegmentInfo: seg2},
},
},
}
meta.AddSegment(NewSegmentInfo(seg1))
meta.AddSegment(NewSegmentInfo(seg2))
segments := meta.GetAllSegmentsUnsafe()
assert.Equal(t, len(segments), 2)
compactionResult := datapb.CompactionResult{
PlanID: 1,
SegmentID: 3,
NumOfRows: 0,
InsertLogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log301")},
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log302")},
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, "log303")},
}
flushCh := make(chan UniqueID, 1)
c := &compactionPlanHandler{
plans: plans,
sessions: sessions,
meta: meta,
flushCh: flushCh,
segRefer: &SegmentReferenceManager{
segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{},
},
}
err := c.completeCompaction(&compactionResult)
segID, ok := <-flushCh
assert.True(t, ok)
assert.Equal(t, compactionResult.GetSegmentID(), segID)
assert.NoError(t, err)
segments = meta.GetAllSegmentsUnsafe()
assert.Equal(t, len(segments), 2)
for _, segment := range segments {
assert.True(t, segment.State == commonpb.SegmentState_Dropped)
}
})
}
func Test_compactionPlanHandler_getCompaction(t *testing.T) {

View File

@ -228,20 +228,6 @@ func (m *meta) AddSegment(segment *SegmentInfo) error {
return nil
}
// AddFakedSegment persist a faked segment into meta
func (m *meta) AddFakedSegment(fakedSegment *datapb.SegmentInfo) error {
m.Lock()
defer m.Unlock()
if err := m.catalog.AddFakedSegment(m.ctx, fakedSegment); err != nil {
log.Error("adding faked segment failed",
zap.Any("segment", fakedSegment),
zap.Error(err))
return err
}
log.Debug("add faked segment finished", zap.Any("segment", fakedSegment))
return nil
}
// DropSegment remove segment with provided id, etcd persistence also removed
func (m *meta) DropSegment(segmentID UniqueID) error {
log.Info("meta update: dropping segment",

View File

@ -359,8 +359,6 @@ func TestMeta_Basic(t *testing.T) {
assert.NotNil(t, info1_1)
assert.Equal(t, false, info1_1.GetIsImporting())
err = meta.AddFakedSegment(segInfo1_1.SegmentInfo)
assert.NoError(t, err)
})
t.Run("Test segment with kv fails", func(t *testing.T) {
@ -388,8 +386,6 @@ func TestMeta_Basic(t *testing.T) {
meta, err = newMeta(context.TODO(), fkv, "")
assert.Nil(t, err)
err = meta.AddFakedSegment(&datapb.SegmentInfo{IsFake: true})
assert.NotNil(t, err)
})
t.Run("Test GetCount", func(t *testing.T) {

View File

@ -805,8 +805,7 @@ func (s *Server) startFlushLoop(ctx context.Context) {
func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error {
segment := s.meta.GetSegment(segmentID)
if segment == nil {
log.Warn("failed to get flused segment", zap.Int64("id", segmentID))
return errors.New("segment not found")
return errors.New("segment not found, might be a faked segemnt, ignore post flush")
}
// set segment to SegmentState_Flushed
if err := s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil {

View File

@ -2693,7 +2693,7 @@ func TestPostFlush(t *testing.T) {
defer closeTestServer(t, svr)
err := svr.postFlush(context.Background(), 1)
assert.EqualValues(t, errors.New("segment not found"), err)
assert.EqualValues(t, errors.New("segment not found, might be a faked segemnt, ignore post flush"), err)
})
t.Run("success post flush", func(t *testing.T) {

View File

@ -76,8 +76,6 @@ type DataCoordCatalog interface {
AlterSegments(ctx context.Context, newSegments []*datapb.SegmentInfo) error
// AlterSegmentsAndAddNewSegment for transaction
AlterSegmentsAndAddNewSegment(ctx context.Context, segments []*datapb.SegmentInfo, newSegment *datapb.SegmentInfo) error
// TODO Remove this later, only a hack
AddFakedSegment(ctx context.Context, segment *datapb.SegmentInfo) error
AlterSegment(ctx context.Context, newSegment *datapb.SegmentInfo, oldSegment *datapb.SegmentInfo) error
SaveDroppedSegmentsInBatch(ctx context.Context, segments []*datapb.SegmentInfo) error
DropSegment(ctx context.Context, segment *datapb.SegmentInfo) error

View File

@ -91,23 +91,6 @@ func (kc *Catalog) ListSegments(ctx context.Context) ([]*datapb.SegmentInfo, err
return segments, nil
}
func (kc *Catalog) AddFakedSegment(ctx context.Context, segment *datapb.SegmentInfo) error {
if !segment.IsFake {
return nil
}
// The fake segment will not be saved into meta, it only needs process handoff case
kvs := make(map[string]string)
flushSegKey := buildFlushedSegmentPath(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID())
clonedSegment := proto.Clone(segment).(*datapb.SegmentInfo)
segBytes, err := marshalSegmentInfo(clonedSegment)
if err != nil {
return err
}
kvs[flushSegKey] = segBytes
return kc.Txn.MultiSave(kvs)
}
func (kc *Catalog) AddSegment(ctx context.Context, segment *datapb.SegmentInfo) error {
kvs, err := buildSegmentAndBinlogsKvs(segment)
if err != nil {
@ -209,14 +192,23 @@ func (kc *Catalog) AlterSegmentsAndAddNewSegment(ctx context.Context, segments [
}
if newSegment != nil {
segmentKvs, err := buildSegmentAndBinlogsKvs(newSegment)
if err != nil {
return err
if newSegment.GetNumOfRows() > 0 {
segmentKvs, err := buildSegmentAndBinlogsKvs(newSegment)
if err != nil {
return err
}
maps.Copy(kvs, segmentKvs)
} else {
// should be a faked segment, we create flush path directly here
flushSegKey := buildFlushedSegmentPath(newSegment.GetCollectionID(), newSegment.GetPartitionID(), newSegment.GetID())
clonedSegment := proto.Clone(newSegment).(*datapb.SegmentInfo)
segBytes, err := marshalSegmentInfo(clonedSegment)
if err != nil {
return err
}
kvs[flushSegKey] = segBytes
}
maps.Copy(kvs, segmentKvs)
}
return kc.Txn.MultiSave(kvs)
}