Modify lastSyncTime in advance to prevent multiple flush binlogs (#22048) (#22088)

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
pull/22143/head
cai.zhang 2023-02-12 18:56:33 +08:00 committed by GitHub
parent 84d6fc1d46
commit 511265c68c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 108 additions and 73 deletions

View File

@ -138,79 +138,27 @@ func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionID Uni
unIndexedIDs.Insert(s.GetID())
}
}
for id := range unIndexedIDs {
// Indexed segments are compacted to a raw segment,
// replace it with the indexed ones
if len(segmentInfos[id].GetCompactionFrom()) > 0 &&
indexed.Contain(segmentInfos[id].GetCompactionFrom()...) {
unIndexedIDs.Remove(id)
indexedIDs.Insert(segmentInfos[id].GetCompactionFrom()...)
droppedIDs.Remove(segmentInfos[id].GetCompactionFrom()...)
hasUnIndexed := true
for hasUnIndexed {
hasUnIndexed = false
for id := range unIndexedIDs {
// Indexed segments are compacted to a raw segment,
// replace it with the indexed ones
if len(segmentInfos[id].GetCompactionFrom()) > 0 {
unIndexedIDs.Remove(id)
for _, segID := range segmentInfos[id].GetCompactionFrom() {
if indexed.Contain(segID) {
indexedIDs.Insert(segID)
} else {
unIndexedIDs.Insert(segID)
hasUnIndexed = true
}
}
droppedIDs.Remove(segmentInfos[id].GetCompactionFrom()...)
}
}
}
//var (
// indexedIDs = make(typeutil.UniqueSet)
// unIndexedIDs = make(typeutil.UniqueSet)
// growingIDs = make(typeutil.UniqueSet)
// droppedIDs = make(typeutil.UniqueSet)
//)
//for _, s := range segments {
// if (partitionID > allPartitionID && s.PartitionID != partitionID) ||
// (s.GetStartPosition() == nil && s.GetDmlPosition() == nil) {
// continue
// }
// if s.GetIsImporting() {
// // Skip bulk insert segments.
// continue
// }
// segmentInfos[s.GetID()] = s
// if s.GetState() == commonpb.SegmentState_Dropped {
// droppedIDs.Insert(s.GetID())
// } else if s.GetState() == commonpb.SegmentState_Growing {
// growingIDs.Insert(s.GetID())
// } else if indexed.Contain(s.GetID()) {
// indexedIDs.Insert(s.GetID())
// } else {
// unIndexedIDs.Insert(s.GetID())
// }
//}
//hasUnIndexed := true
//for hasUnIndexed {
// hasUnIndexed = false
// for id := range unIndexedIDs {
// // Indexed segments are compacted to a raw segment,
// // replace it with the indexed ones
// if indexed.Contain(id) {
// unIndexedIDs.Remove(id)
// indexedIDs.Insert(id)
// continue
// }
// if len(segmentInfos[id].GetCompactionFrom()) > 0 {
// unIndexedIDs.Remove(id)
// for _, segID := range segmentInfos[id].GetCompactionFrom() {
// if indexed.Contain(segID) {
// indexedIDs.Insert(segID)
// } else {
// unIndexedIDs.Insert(id)
// hasUnIndexed = true
// }
// }
// droppedIDs.Remove(segmentInfos[id].GetCompactionFrom()...)
// }
// }
//}
//
//return &datapb.VchannelInfo{
// CollectionID: channel.CollectionID,
// ChannelName: channel.Name,
// SeekPosition: h.GetChannelSeekPosition(channel, partitionID),
// IndexedSegmentIds: indexed.Collect(),
// FlushedSegmentIds: unIndexedIDs.Collect(),
// UnflushedSegmentIds: growingIDs.Collect(),
// DroppedSegmentIds: droppedIDs.Collect(),
//}
return &datapb.VchannelInfo{
CollectionID: channel.CollectionID,
ChannelName: channel.Name,

View File

@ -2697,6 +2697,87 @@ func TestGetRecoveryInfo(t *testing.T) {
assert.NotEqual(t, 0, resp.GetChannels()[0].GetSeekPosition().GetTimestamp())
})
t.Run("with continuous compaction", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoord, error) {
return newMockRootCoordService(), nil
}
svr.meta.AddCollection(&collectionInfo{
ID: 0,
Schema: newTestSchema(),
})
err := svr.meta.UpdateChannelCheckpoint("vchan1", &internalpb.MsgPosition{
ChannelName: "vchan1",
Timestamp: 0,
})
assert.NoError(t, err)
seg1 := createSegment(9, 0, 0, 100, 30, "vchan1", commonpb.SegmentState_Dropped)
seg2 := createSegment(10, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Dropped)
seg3 := createSegment(11, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Dropped)
seg3.CompactionFrom = []int64{9, 10}
seg4 := createSegment(12, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Dropped)
seg5 := createSegment(13, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Flushed)
seg5.CompactionFrom = []int64{11, 12}
err = svr.meta.AddSegment(NewSegmentInfo(seg1))
assert.Nil(t, err)
err = svr.meta.AddSegment(NewSegmentInfo(seg2))
assert.Nil(t, err)
err = svr.meta.AddSegment(NewSegmentInfo(seg3))
assert.Nil(t, err)
err = svr.meta.AddSegment(NewSegmentInfo(seg4))
assert.Nil(t, err)
err = svr.meta.AddSegment(NewSegmentInfo(seg5))
assert.Nil(t, err)
err = svr.meta.CreateIndex(&model.Index{
TenantID: "",
CollectionID: 0,
FieldID: 2,
IndexID: 0,
IndexName: "_default_idx_2",
IsDeleted: false,
CreateTime: 0,
TypeParams: nil,
IndexParams: nil,
IsAutoIndex: false,
UserIndexParams: nil,
})
assert.Nil(t, err)
svr.meta.segments.SetSegmentIndex(seg4.ID, &model.SegmentIndex{
SegmentID: seg4.ID,
CollectionID: 0,
PartitionID: 0,
NumRows: 100,
IndexID: 0,
BuildID: 0,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreateTime: 0,
IndexFileKeys: nil,
IndexSize: 0,
})
req := &datapb.GetRecoveryInfoRequest{
CollectionID: 0,
PartitionID: 0,
}
resp, err := svr.GetRecoveryInfo(context.TODO(), req)
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.NotNil(t, resp.GetChannels()[0].SeekPosition)
assert.NotEqual(t, 0, resp.GetChannels()[0].GetSeekPosition().GetTimestamp())
assert.Len(t, resp.GetChannels()[0].GetDroppedSegmentIds(), 0)
assert.ElementsMatch(t, []UniqueID{9, 10}, resp.GetChannels()[0].GetUnflushedSegmentIds())
assert.ElementsMatch(t, []UniqueID{12}, resp.GetChannels()[0].GetFlushedSegmentIds())
})
t.Run("with closed server", func(t *testing.T) {
svr := newTestServer(t, nil)
closeTestServer(t, svr)

View File

@ -483,6 +483,7 @@ func (ibNode *insertBufferNode) Sync(fgMsg *flowGraphMsg, seg2Upload []UniqueID,
segmentsToSync = append(segmentsToSync, task.segmentID)
ibNode.channel.rollInsertBuffer(task.segmentID)
ibNode.channel.RollPKstats(task.segmentID, pkStats)
ibNode.channel.setSegmentLastSyncTs(task.segmentID, endPosition.GetTimestamp())
metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SuccessLabel).Inc()
metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.TotalLabel).Inc()
if task.auto {

View File

@ -38,6 +38,7 @@ import (
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
@ -228,7 +229,12 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
}
inMsg := genFlowGraphInsertMsg(insertChannelName)
assert.NotPanics(t, func() { iBNode.Operate([]flowgraph.Msg{&inMsg}) })
iBNode.channel.setSegmentLastSyncTs(UniqueID(1), tsoutil.ComposeTSByTime(time.Now().Add(-11*time.Minute), 0))
assert.NotPanics(t, func() {
res := iBNode.Operate([]flowgraph.Msg{&inMsg})
assert.Subset(t, res[0].(*flowGraphMsg).segmentsToSync, []UniqueID{1})
})
assert.NotSubset(t, iBNode.channel.listSegmentIDsToSync(tsoutil.ComposeTSByTime(time.Now(), 0)), []UniqueID{1})
resendTTChan <- resendTTMsg{
segmentIDs: []int64{0, 1, 2},

View File

@ -864,6 +864,5 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet
dsService.flushingSegCache.Remove(req.GetSegmentID())
dsService.channel.evictHistoryInsertBuffer(req.GetSegmentID(), pack.pos)
dsService.channel.evictHistoryDeleteBuffer(req.GetSegmentID(), pack.pos)
dsService.channel.setSegmentLastSyncTs(req.GetSegmentID(), pack.pos.GetTimestamp())
}
}

View File

@ -898,7 +898,7 @@ func genFlowGraphInsertMsg(chanName string) flowGraphMsg {
{
ChannelName: chanName,
MsgID: make([]byte, 0),
Timestamp: 0,
Timestamp: tsoutil.ComposeTSByTime(time.Now(), 0),
},
}