Support stats and delta log SaveBinlog logic (#10156)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/10168/head
congqixia 2021-10-19 14:32:41 +08:00 committed by GitHub
parent 36a4900d1b
commit 6a886d63f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 29 additions and 7 deletions

View File

@ -154,7 +154,7 @@ func (m *meta) SetState(segmentID UniqueID, state commonpb.SegmentState) error {
// `flushed` parameter indicating whether segment is flushed completely or partially
// `binlogs`, `checkpoints` and `statPositions` are persistence data for segment
func (m *meta) UpdateFlushSegmentsInfo(segmentID UniqueID, flushed bool,
binlogs []*datapb.FieldBinlog, checkpoints []*datapb.CheckPoint,
binlogs, statslogs []*datapb.FieldBinlog, deltalogs []*datapb.DeltaLogInfo, checkpoints []*datapb.CheckPoint,
startPositions []*datapb.SegmentStartPosition) error {
m.Lock()
defer m.Unlock()
@ -184,7 +184,7 @@ func (m *meta) UpdateFlushSegmentsInfo(segmentID UniqueID, flushed bool,
}
return nil
}
// binlogs
for _, tBinlogs := range binlogs {
fieldBinlogs := getFieldBinlogs(tBinlogs.GetFieldID(), currBinlogs)
if fieldBinlogs == nil {
@ -193,8 +193,21 @@ func (m *meta) UpdateFlushSegmentsInfo(segmentID UniqueID, flushed bool,
fieldBinlogs.Binlogs = append(fieldBinlogs.Binlogs, tBinlogs.Binlogs...)
}
}
clonedSegment.Binlogs = currBinlogs
// statlogs
currStatsLogs := clonedSegment.GetStatslogs()
for _, tStatsLogs := range statslogs {
fieldStatsLog := getFieldBinlogs(tStatsLogs.GetFieldID(), currStatsLogs)
if fieldStatsLog == nil {
currStatsLogs = append(currStatsLogs, tStatsLogs)
} else {
fieldStatsLog.Binlogs = append(fieldStatsLog.Binlogs, tStatsLogs.Binlogs...)
}
}
clonedSegment.Statslogs = currStatsLogs
// deltalogs
clonedSegment.Deltalogs = append(clonedSegment.Deltalogs, deltalogs...)
modSegments[segmentID] = clonedSegment
for _, pos := range startPositions {

View File

@ -211,11 +211,14 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) {
meta, err := newMeta(memkv.NewMemoryKV())
assert.Nil(t, err)
segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ID: 1, State: commonpb.SegmentState_Growing, Binlogs: []*datapb.FieldBinlog{{FieldID: 1, Binlogs: []string{"binlog0"}}}}}
segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ID: 1, State: commonpb.SegmentState_Growing, Binlogs: []*datapb.FieldBinlog{{FieldID: 1, Binlogs: []string{"binlog0"}}},
Statslogs: []*datapb.FieldBinlog{{FieldID: 1, Binlogs: []string{"statslog0"}}}}}
err = meta.AddSegment(segment1)
assert.Nil(t, err)
err = meta.UpdateFlushSegmentsInfo(1, true, []*datapb.FieldBinlog{{FieldID: 1, Binlogs: []string{"binlog1"}}},
[]*datapb.FieldBinlog{{FieldID: 1, Binlogs: []string{"statslog1"}}},
[]*datapb.DeltaLogInfo{{RecordEntries: 1, TimestampFrom: 100, TimestampTo: 200, DeltaLogSize: 1000}},
[]*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}, []*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &internalpb.MsgPosition{MsgID: []byte{1, 2, 3}}}})
assert.Nil(t, err)
@ -224,6 +227,8 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) {
ID: 1, State: commonpb.SegmentState_Flushing, NumOfRows: 10,
StartPosition: &internalpb.MsgPosition{MsgID: []byte{1, 2, 3}},
Binlogs: []*datapb.FieldBinlog{{FieldID: 1, Binlogs: []string{"binlog0", "binlog1"}}},
Statslogs: []*datapb.FieldBinlog{{FieldID: 1, Binlogs: []string{"statslog0", "statslog1"}}},
Deltalogs: []*datapb.DeltaLogInfo{{RecordEntries: 1, TimestampFrom: 100, TimestampTo: 200, DeltaLogSize: 1000}},
}}
assert.EqualValues(t, expected, updated)
})
@ -232,7 +237,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) {
meta, err := newMeta(memkv.NewMemoryKV())
assert.Nil(t, err)
err = meta.UpdateFlushSegmentsInfo(1, false, nil, nil, nil)
err = meta.UpdateFlushSegmentsInfo(1, false, nil, nil, nil, nil, nil)
assert.Nil(t, err)
})
@ -244,7 +249,8 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) {
err = meta.AddSegment(segment1)
assert.Nil(t, err)
err = meta.UpdateFlushSegmentsInfo(1, false, nil, []*datapb.CheckPoint{{SegmentID: 2, NumOfRows: 10}},
err = meta.UpdateFlushSegmentsInfo(1, false, nil, nil, nil, []*datapb.CheckPoint{{SegmentID: 2, NumOfRows: 10}},
[]*datapb.SegmentStartPosition{{SegmentID: 2, StartPosition: &internalpb.MsgPosition{MsgID: []byte{1, 2, 3}}}})
assert.Nil(t, err)
assert.Nil(t, meta.GetSegment(2))
@ -266,6 +272,8 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) {
meta.segments.SetSegment(1, segmentInfo)
err = meta.UpdateFlushSegmentsInfo(1, true, []*datapb.FieldBinlog{{FieldID: 1, Binlogs: []string{"binlog"}}},
[]*datapb.FieldBinlog{{FieldID: 1, Binlogs: []string{"statslog"}}},
[]*datapb.DeltaLogInfo{{RecordEntries: 1, TimestampFrom: 100, TimestampTo: 200, DeltaLogSize: 1000}},
[]*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}, []*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &internalpb.MsgPosition{MsgID: []byte{1, 2, 3}}}})
assert.NotNil(t, err)
assert.Equal(t, "mocked fail", err.Error())

View File

@ -311,7 +311,8 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
// set segment to SegmentState_Flushing and save binlogs and checkpoints
err := s.meta.UpdateFlushSegmentsInfo(req.GetSegmentID(), req.GetFlushed(),
req.GetField2BinlogPaths(), req.GetCheckPoints(), req.GetStartPositions())
req.GetField2BinlogPaths(), req.GetField2StatslogPaths(), req.GetDeltalogs(),
req.GetCheckPoints(), req.GetStartPositions())
if err != nil {
log.Error("save binlog and checkpoints failed",
zap.Int64("segmentID", req.GetSegmentID()),