fix duplicated handoff request (#20108)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
pull/20125/head
Xiaofan 2022-10-27 15:15:32 +08:00 committed by GitHub
parent e470cd3dfa
commit def5972e01
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 51 additions and 34 deletions

View File

@ -315,7 +315,7 @@ func (m *meta) SetState(segmentID UniqueID, targetState commonpb.SegmentState) e
clonedSegment.State = targetState clonedSegment.State = targetState
oldState := curSegInfo.GetState() oldState := curSegInfo.GetState()
if clonedSegment != nil && isSegmentHealthy(clonedSegment) { if clonedSegment != nil && isSegmentHealthy(clonedSegment) {
if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{clonedSegment.SegmentInfo}); err != nil { if err := m.catalog.AlterSegment(m.ctx, clonedSegment.SegmentInfo, curSegInfo.SegmentInfo); err != nil {
log.Error("meta update: setting segment state - failed to alter segments", log.Error("meta update: setting segment state - failed to alter segments",
zap.Int64("segment ID", segmentID), zap.Int64("segment ID", segmentID),
zap.String("target state", targetState.String()), zap.String("target state", targetState.String()),
@ -353,7 +353,7 @@ func (m *meta) UnsetIsImporting(segmentID UniqueID) error {
clonedSegment := curSegInfo.Clone() clonedSegment := curSegInfo.Clone()
clonedSegment.IsImporting = false clonedSegment.IsImporting = false
if isSegmentHealthy(clonedSegment) { if isSegmentHealthy(clonedSegment) {
if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{clonedSegment.SegmentInfo}); err != nil { if err := m.catalog.AlterSegment(m.ctx, clonedSegment.SegmentInfo, curSegInfo.SegmentInfo); err != nil {
log.Error("meta update: unsetting isImport state of segment - failed to unset segment isImporting state", log.Error("meta update: unsetting isImport state of segment - failed to unset segment isImporting state",
zap.Int64("segment ID", segmentID), zap.Int64("segment ID", segmentID),
zap.Error(err)) zap.Error(err))
@ -823,7 +823,7 @@ func (m *meta) AddAllocation(segmentID UniqueID, allocation *Allocation) error {
// Persist segment updates first. // Persist segment updates first.
clonedSegment := curSegInfo.Clone(AddAllocation(allocation)) clonedSegment := curSegInfo.Clone(AddAllocation(allocation))
if clonedSegment != nil && isSegmentHealthy(clonedSegment) { if clonedSegment != nil && isSegmentHealthy(clonedSegment) {
if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{clonedSegment.SegmentInfo}); err != nil { if err := m.catalog.AlterSegment(m.ctx, clonedSegment.SegmentInfo, curSegInfo.SegmentInfo); err != nil {
log.Error("meta update: add allocation failed", log.Error("meta update: add allocation failed",
zap.Int64("segment ID", segmentID), zap.Int64("segment ID", segmentID),
zap.Error(err)) zap.Error(err))

View File

@ -607,7 +607,7 @@ func TestSaveHandoffMeta(t *testing.T) {
info := &datapb.SegmentInfo{ info := &datapb.SegmentInfo{
ID: 100, ID: 100,
State: commonpb.SegmentState_Flushed, State: commonpb.SegmentState_Flushing,
} }
segmentInfo := &SegmentInfo{ segmentInfo := &SegmentInfo{
SegmentInfo: info, SegmentInfo: info,
@ -618,7 +618,20 @@ func TestSaveHandoffMeta(t *testing.T) {
keys, _, err := kvClient.LoadWithPrefix(util.FlushedSegmentPrefix) keys, _, err := kvClient.LoadWithPrefix(util.FlushedSegmentPrefix)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, 0, len(keys))
newInfo := &datapb.SegmentInfo{
ID: 100,
State: commonpb.SegmentState_Flushed,
}
err = meta.catalog.AlterSegment(context.TODO(), newInfo, segmentInfo.SegmentInfo)
assert.Nil(t, err)
keys, _, err = kvClient.LoadWithPrefix(util.FlushedSegmentPrefix)
assert.Nil(t, err)
assert.Equal(t, 1, len(keys)) assert.Equal(t, 1, len(keys))
segmentID, err := strconv.ParseInt(filepath.Base(keys[0]), 10, 64) segmentID, err := strconv.ParseInt(filepath.Base(keys[0]), 10, 64)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, 100, int(segmentID)) assert.Equal(t, 100, int(segmentID))

View File

@ -788,7 +788,11 @@ func (s *Server) startFlushLoop(ctx context.Context) {
return return
case segmentID := <-s.flushCh: case segmentID := <-s.flushCh:
//Ignore return error //Ignore return error
_ = s.postFlush(ctx, segmentID) log.Info("flush successfully", zap.Any("segmentID", segmentID))
err := s.postFlush(ctx, segmentID)
if err != nil {
log.Warn("failed to do post flush", zap.Any("segmentID", segmentID), zap.Error(err))
}
} }
} }
}() }()

View File

@ -72,10 +72,13 @@ func (t AlterType) String() string {
type DataCoordCatalog interface { type DataCoordCatalog interface {
ListSegments(ctx context.Context) ([]*datapb.SegmentInfo, error) ListSegments(ctx context.Context) ([]*datapb.SegmentInfo, error)
AddSegment(ctx context.Context, segment *datapb.SegmentInfo) error AddSegment(ctx context.Context, segment *datapb.SegmentInfo) error
AddFakedSegment(ctx context.Context, segment *datapb.SegmentInfo) error // TODO Remove this later, we should update flush segments info for each segment separately, so far we still need transaction
AlterSegments(ctx context.Context, segments []*datapb.SegmentInfo) error AlterSegments(ctx context.Context, newSegments []*datapb.SegmentInfo) error
// AlterSegmentsAndAddNewSegment for transaction // AlterSegmentsAndAddNewSegment for transaction
AlterSegmentsAndAddNewSegment(ctx context.Context, segments []*datapb.SegmentInfo, newSegment *datapb.SegmentInfo) error AlterSegmentsAndAddNewSegment(ctx context.Context, segments []*datapb.SegmentInfo, newSegment *datapb.SegmentInfo) error
// TODO Remove this later, only a hack
AddFakedSegment(ctx context.Context, segment *datapb.SegmentInfo) error
AlterSegment(ctx context.Context, newSegment *datapb.SegmentInfo, oldSegment *datapb.SegmentInfo) error
SaveDroppedSegmentsInBatch(ctx context.Context, segments []*datapb.SegmentInfo) error SaveDroppedSegmentsInBatch(ctx context.Context, segments []*datapb.SegmentInfo) error
DropSegment(ctx context.Context, segment *datapb.SegmentInfo) error DropSegment(ctx context.Context, segment *datapb.SegmentInfo) error
MarkChannelDeleted(ctx context.Context, channel string) error MarkChannelDeleted(ctx context.Context, channel string) error

View File

@ -113,44 +113,41 @@ func (kc *Catalog) AddSegment(ctx context.Context, segment *datapb.SegmentInfo)
if err != nil { if err != nil {
return err return err
} }
return kc.Txn.MultiSave(kvs)
}
// save handoff req if segment is flushed func (kc *Catalog) AlterSegments(ctx context.Context, newSegments []*datapb.SegmentInfo) error {
if segment.State == commonpb.SegmentState_Flushed { if len(newSegments) == 0 {
flushSegKey := buildFlushedSegmentPath(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID()) return nil
newSeg := &datapb.SegmentInfo{ID: segment.GetID()} }
segBytes, err := marshalSegmentInfo(newSeg)
kvs := make(map[string]string)
for _, segment := range newSegments {
segmentKvs, err := buildSegmentAndBinlogsKvs(segment)
if err != nil { if err != nil {
return err return err
} }
kvs[flushSegKey] = segBytes maps.Copy(kvs, segmentKvs)
} }
return kc.Txn.MultiSave(kvs) return kc.Txn.MultiSave(kvs)
} }
func (kc *Catalog) AlterSegments(ctx context.Context, modSegments []*datapb.SegmentInfo) error { func (kc *Catalog) AlterSegment(ctx context.Context, newSegment *datapb.SegmentInfo, oldSegment *datapb.SegmentInfo) error {
if len(modSegments) == 0 {
return nil
}
kvs := make(map[string]string) kvs := make(map[string]string)
for _, segment := range modSegments { segmentKvs, err := buildSegmentAndBinlogsKvs(newSegment)
segmentKvs, err := buildSegmentAndBinlogsKvs(segment) if err != nil {
return err
}
maps.Copy(kvs, segmentKvs)
if newSegment.State == commonpb.SegmentState_Flushed && oldSegment.State != commonpb.SegmentState_Flushed {
flushSegKey := buildFlushedSegmentPath(newSegment.GetCollectionID(), newSegment.GetPartitionID(), newSegment.GetID())
newSeg := &datapb.SegmentInfo{ID: newSegment.GetID()}
segBytes, err := marshalSegmentInfo(newSeg)
if err != nil { if err != nil {
return err return err
} }
kvs[flushSegKey] = segBytes
maps.Copy(kvs, segmentKvs)
// save handoff req if segment is flushed
if segment.State == commonpb.SegmentState_Flushed {
flushSegKey := buildFlushedSegmentPath(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID())
newSeg := &datapb.SegmentInfo{ID: segment.GetID()}
segBytes, err := marshalSegmentInfo(newSeg)
if err != nil {
return err
}
kvs[flushSegKey] = segBytes
}
} }
return kc.Txn.MultiSave(kvs) return kc.Txn.MultiSave(kvs)

View File

@ -284,7 +284,7 @@ func Test_AddSegments(t *testing.T) {
_, ok := savedKvs[k4] _, ok := savedKvs[k4]
assert.False(t, ok) assert.False(t, ok)
assert.Equal(t, 5, len(savedKvs)) assert.Equal(t, 4, len(savedKvs))
verifySavedKvsForSegment(t, savedKvs) verifySavedKvsForSegment(t, savedKvs)
}) })
} }
@ -330,7 +330,7 @@ func Test_AlterSegments(t *testing.T) {
_, ok := savedKvs[k4] _, ok := savedKvs[k4]
assert.False(t, ok) assert.False(t, ok)
assert.Equal(t, 5, len(savedKvs)) assert.Equal(t, 4, len(savedKvs))
verifySavedKvsForSegment(t, savedKvs) verifySavedKvsForSegment(t, savedKvs)
}) })
} }