Fix save binlog path timeout bug (#9573)

issue: #9559
Signed-off-by: sunby <bingyi.sun@zilliz.com>
pull/9763/head
sunby 2021-10-12 23:46:35 +08:00 committed by GitHub
parent d8f490082c
commit e811a66527
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 119 additions and 13 deletions

View File

@ -164,15 +164,18 @@ func (m *meta) UpdateFlushSegmentsInfo(segmentID UniqueID, flushed bool,
return nil
}
clonedSegment := segment.Clone()
kv := make(map[string]string)
modSegments := make(map[UniqueID]struct{})
modSegments := make(map[UniqueID]*SegmentInfo)
if flushed {
m.segments.SetState(segmentID, commonpb.SegmentState_Flushing)
modSegments[segmentID] = struct{}{}
clonedSegment.State = commonpb.SegmentState_Flushing
modSegments[segmentID] = clonedSegment
}
currBinlogs := segment.Clone().SegmentInfo.GetBinlogs()
currBinlogs := clonedSegment.GetBinlogs()
var getFieldBinlogs = func(id UniqueID, binlogs []*datapb.FieldBinlog) *datapb.FieldBinlog {
for _, binlog := range binlogs {
if id == binlog.GetFieldID() {
@ -181,6 +184,7 @@ func (m *meta) UpdateFlushSegmentsInfo(segmentID UniqueID, flushed bool,
}
return nil
}
for _, tBinlogs := range binlogs {
fieldBinlogs := getFieldBinlogs(tBinlogs.GetFieldID(), currBinlogs)
if fieldBinlogs == nil {
@ -189,32 +193,49 @@ func (m *meta) UpdateFlushSegmentsInfo(segmentID UniqueID, flushed bool,
fieldBinlogs.Binlogs = append(fieldBinlogs.Binlogs, tBinlogs.Binlogs...)
}
}
m.segments.SetBinlogs(segmentID, currBinlogs)
modSegments[segmentID] = struct{}{}
clonedSegment.Binlogs = currBinlogs
modSegments[segmentID] = clonedSegment
for _, pos := range startPositions {
if len(pos.GetStartPosition().GetMsgID()) == 0 {
continue
}
m.segments.SetStartPosition(pos.GetSegmentID(), pos.GetStartPosition())
modSegments[segmentID] = struct{}{}
s := modSegments[pos.GetSegmentID()]
if s == nil {
s = m.segments.GetSegment(pos.GetSegmentID())
}
if s == nil {
continue
}
s.StartPosition = pos.GetStartPosition()
modSegments[pos.GetSegmentID()] = s
}
for _, cp := range checkpoints {
if segment.DmlPosition != nil && segment.DmlPosition.Timestamp >= cp.Position.Timestamp {
s := modSegments[cp.GetSegmentID()]
if s == nil {
s = m.segments.GetSegment(cp.GetSegmentID())
}
if s == nil {
continue
}
if s.DmlPosition != nil && s.DmlPosition.Timestamp >= cp.Position.Timestamp {
// segment position in etcd is larger than checkpoint, then dont change it
continue
}
m.segments.SetDmlPosition(cp.GetSegmentID(), cp.GetPosition())
m.segments.SetRowCount(cp.GetSegmentID(), cp.GetNumOfRows())
modSegments[segmentID] = struct{}{}
s.DmlPosition = cp.GetPosition()
s.NumOfRows = cp.GetNumOfRows()
modSegments[cp.GetSegmentID()] = s
}
for id := range modSegments {
if segment := m.segments.GetSegment(id); segment != nil {
segBytes, err := proto.Marshal(segment.SegmentInfo)
if err != nil {
log.Error("DataCoord UpdateFlushSegmentsInfo marshal failed", zap.Int64("segmentID", segment.GetID()), zap.Error(err))
return fmt.Errorf("DataCoord UpdateFlushSegmentsInfo segmentID:%d, marshal failed:%w", segment.GetID(), err)
}
key := buildSegmentPath(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID())
@ -222,9 +243,18 @@ func (m *meta) UpdateFlushSegmentsInfo(segmentID UniqueID, flushed bool,
}
}
if len(kv) == 0 {
return nil
}
if err := m.saveKvTxn(kv); err != nil {
return err
}
// update memory status
for id, s := range modSegments {
m.segments.SetSegment(id, s)
}
return nil
}

View File

@ -18,6 +18,7 @@ import (
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/stretchr/testify/assert"
)
@ -204,3 +205,74 @@ func TestGetUnFlushedSegments(t *testing.T) {
assert.EqualValues(t, 0, segments[0].ID)
assert.NotEqualValues(t, commonpb.SegmentState_Flushed, segments[0].State)
}
func TestUpdateFlushSegmentsInfo(t *testing.T) {
t.Run("normal", func(t *testing.T) {
meta, err := newMeta(memkv.NewMemoryKV())
assert.Nil(t, err)
segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ID: 1, State: commonpb.SegmentState_Growing, Binlogs: []*datapb.FieldBinlog{{FieldID: 1, Binlogs: []string{"binlog0"}}}}}
err = meta.AddSegment(segment1)
assert.Nil(t, err)
err = meta.UpdateFlushSegmentsInfo(1, true, []*datapb.FieldBinlog{{FieldID: 1, Binlogs: []string{"binlog1"}}},
[]*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}, []*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &internalpb.MsgPosition{MsgID: []byte{1, 2, 3}}}})
assert.Nil(t, err)
updated := meta.GetSegment(1)
expected := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
ID: 1, State: commonpb.SegmentState_Flushing, NumOfRows: 10,
StartPosition: &internalpb.MsgPosition{MsgID: []byte{1, 2, 3}},
Binlogs: []*datapb.FieldBinlog{{FieldID: 1, Binlogs: []string{"binlog0", "binlog1"}}},
}}
assert.EqualValues(t, expected, updated)
})
t.Run("update non-existed segment", func(t *testing.T) {
meta, err := newMeta(memkv.NewMemoryKV())
assert.Nil(t, err)
err = meta.UpdateFlushSegmentsInfo(1, false, nil, nil, nil)
assert.Nil(t, err)
})
t.Run("update checkpoints and start position of non existed segment", func(t *testing.T) {
meta, err := newMeta(memkv.NewMemoryKV())
assert.Nil(t, err)
segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ID: 1, State: commonpb.SegmentState_Growing}}
err = meta.AddSegment(segment1)
assert.Nil(t, err)
err = meta.UpdateFlushSegmentsInfo(1, false, nil, []*datapb.CheckPoint{{SegmentID: 2, NumOfRows: 10}},
[]*datapb.SegmentStartPosition{{SegmentID: 2, StartPosition: &internalpb.MsgPosition{MsgID: []byte{1, 2, 3}}}})
assert.Nil(t, err)
assert.Nil(t, meta.GetSegment(2))
})
t.Run("test save etcd failed", func(t *testing.T) {
kv := memkv.NewMemoryKV()
failedKv := &saveFailKV{kv}
meta, err := newMeta(failedKv)
assert.Nil(t, err)
segmentInfo := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
NumOfRows: 0,
State: commonpb.SegmentState_Growing,
},
}
meta.segments.SetSegment(1, segmentInfo)
err = meta.UpdateFlushSegmentsInfo(1, true, []*datapb.FieldBinlog{{FieldID: 1, Binlogs: []string{"binlog"}}},
[]*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}, []*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &internalpb.MsgPosition{MsgID: []byte{1, 2, 3}}}})
assert.NotNil(t, err)
assert.Equal(t, "mocked fail", err.Error())
segmentInfo = meta.GetSegment(1)
assert.EqualValues(t, 0, segmentInfo.NumOfRows)
assert.Equal(t, commonpb.SegmentState_Growing, segmentInfo.State)
assert.Nil(t, segmentInfo.Binlogs)
assert.Nil(t, segmentInfo.StartPosition)
})
}

View File

@ -76,6 +76,10 @@ func (kv *saveFailKV) Save(key, value string) error {
return errors.New("mocked fail")
}
func (kv *saveFailKV) MultiSave(kvs map[string]string) error {
return errors.New("mocked fail")
}
// a mock kv that always fail when do `Remove`
type removeFailKV struct{ kv.TxnKV }