mirror of https://github.com/milvus-io/milvus.git
Signed-off-by: bigsheeper <yihao.dai@zilliz.com> Signed-off-by: bigsheeper <yihao.dai@zilliz.com> Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/20622/head
parent
d3dedfc933
commit
6c651d25a4
|
@ -281,11 +281,13 @@ dataNode:
|
||||||
flowGraph:
|
flowGraph:
|
||||||
maxQueueLength: 1024 # Maximum length of task queue in flowgraph
|
maxQueueLength: 1024 # Maximum length of task queue in flowgraph
|
||||||
maxParallelism: 1024 # Maximum number of tasks executed in parallel in the flowgraph
|
maxParallelism: 1024 # Maximum number of tasks executed in parallel in the flowgraph
|
||||||
flush:
|
segment:
|
||||||
# Max buffer size to flush for a single segment.
|
# Max buffer size to flush for a single segment.
|
||||||
insertBufSize: 16777216 # Bytes, 16 MB
|
insertBufSize: 16777216 # Bytes, 16 MB
|
||||||
# Max buffer size to flush del for a single channel
|
# Max buffer size to flush del for a single channel
|
||||||
deleteBufBytes: 67108864 # Bytes, 64MB
|
deleteBufBytes: 67108864 # Bytes, 64MB
|
||||||
|
# The period to sync segments if buffer is not empty.
|
||||||
|
syncPeriod: 600 # Seconds, 10min
|
||||||
|
|
||||||
|
|
||||||
# Configures the system log output.
|
# Configures the system log output.
|
||||||
|
|
|
@ -756,7 +756,7 @@ func (c *ChannelManager) Reassign(nodeID UniqueID, channelName string) error {
|
||||||
return fmt.Errorf("failed to remove watch info: %v,%s", ch, err.Error())
|
return fmt.Errorf("failed to remove watch info: %v,%s", ch, err.Error())
|
||||||
}
|
}
|
||||||
if err := c.h.FinishDropChannel(channelName); err != nil {
|
if err := c.h.FinishDropChannel(channelName); err != nil {
|
||||||
return fmt.Errorf("FinishDropChannel failed, err=%s", err)
|
return fmt.Errorf("FinishDropChannel failed, err=%w", err)
|
||||||
}
|
}
|
||||||
log.Info("removed channel assignment", zap.String("channel name", channelName))
|
log.Info("removed channel assignment", zap.String("channel name", channelName))
|
||||||
return nil
|
return nil
|
||||||
|
@ -805,7 +805,7 @@ func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string)
|
||||||
|
|
||||||
log.Info("try to cleanup removal flag ", zap.String("channel name", channelName))
|
log.Info("try to cleanup removal flag ", zap.String("channel name", channelName))
|
||||||
if err := c.h.FinishDropChannel(channelName); err != nil {
|
if err := c.h.FinishDropChannel(channelName); err != nil {
|
||||||
return fmt.Errorf("FinishDropChannel failed, err=%s", err)
|
return fmt.Errorf("FinishDropChannel failed, err=%w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("removed channel assignment", zap.Any("channel", chToCleanUp))
|
log.Info("removed channel assignment", zap.Any("channel", chToCleanUp))
|
||||||
|
|
|
@ -88,7 +88,7 @@ func (h *ServerHandler) GetDataVChanPositions(channel *channel, partitionID Uniq
|
||||||
return &datapb.VchannelInfo{
|
return &datapb.VchannelInfo{
|
||||||
CollectionID: channel.CollectionID,
|
CollectionID: channel.CollectionID,
|
||||||
ChannelName: channel.Name,
|
ChannelName: channel.Name,
|
||||||
SeekPosition: h.getChannelCheckpoint(channel),
|
SeekPosition: h.GetChannelSeekPosition(channel, partitionID),
|
||||||
FlushedSegmentIds: flushedIDs.Collect(),
|
FlushedSegmentIds: flushedIDs.Collect(),
|
||||||
UnflushedSegmentIds: unflushedIDs.Collect(),
|
UnflushedSegmentIds: unflushedIDs.Collect(),
|
||||||
DroppedSegmentIds: droppedIDs.Collect(),
|
DroppedSegmentIds: droppedIDs.Collect(),
|
||||||
|
@ -151,44 +151,119 @@ func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionID Uni
|
||||||
return &datapb.VchannelInfo{
|
return &datapb.VchannelInfo{
|
||||||
CollectionID: channel.CollectionID,
|
CollectionID: channel.CollectionID,
|
||||||
ChannelName: channel.Name,
|
ChannelName: channel.Name,
|
||||||
SeekPosition: h.getChannelCheckpoint(channel),
|
SeekPosition: h.GetChannelSeekPosition(channel, partitionID),
|
||||||
FlushedSegmentIds: indexedIDs.Collect(),
|
FlushedSegmentIds: indexedIDs.Collect(),
|
||||||
UnflushedSegmentIds: unIndexedIDs.Collect(),
|
UnflushedSegmentIds: unIndexedIDs.Collect(),
|
||||||
DroppedSegmentIds: droppedIDs.Collect(),
|
DroppedSegmentIds: droppedIDs.Collect(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *ServerHandler) getChannelCheckpoint(channel *channel) *internalpb.MsgPosition {
|
// getEarliestSegmentDMLPos returns the earliest dml position of segments,
|
||||||
seekPosition := h.s.meta.GetChannelCheckpoint(channel.Name)
|
// this is mainly for COMPATIBILITY with old version <=2.1.x
|
||||||
if seekPosition != nil {
|
func (h *ServerHandler) getEarliestSegmentDMLPos(channel *channel, partitionID UniqueID) *internalpb.MsgPosition {
|
||||||
log.Info("channel seek position set from ChannelCP",
|
var minPos *internalpb.MsgPosition
|
||||||
zap.String("channel", channel.Name),
|
var minPosSegID int64
|
||||||
zap.Uint64("position timestamp", seekPosition.Timestamp),
|
var minPosTs uint64
|
||||||
zap.Time("realworld position timestamp", tsoutil.PhysicalTime(seekPosition.GetTimestamp())),
|
segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool {
|
||||||
)
|
return s.InsertChannel == channel.Name
|
||||||
} else {
|
})
|
||||||
// use collection start position when segment position is not found
|
for _, s := range segments {
|
||||||
if channel.StartPositions == nil {
|
if (partitionID > allPartitionID && s.PartitionID != partitionID) ||
|
||||||
collection, err := h.GetCollection(h.s.ctx, channel.CollectionID)
|
(s.GetStartPosition() == nil && s.GetDmlPosition() == nil) {
|
||||||
if collection != nil && err == nil {
|
continue
|
||||||
seekPosition = getCollectionStartPosition(channel.Name, collection)
|
}
|
||||||
}
|
if s.GetIsImporting() {
|
||||||
log.Info("NEITHER segment position or channel start position are found, setting channel seek position to collection start position",
|
// Skip bulk insert segments.
|
||||||
zap.String("channel", channel.Name),
|
continue
|
||||||
zap.Uint64("position timestamp", seekPosition.GetTimestamp()),
|
}
|
||||||
zap.Time("realworld position timestamp", tsoutil.PhysicalTime(seekPosition.GetTimestamp())),
|
if s.GetState() == commonpb.SegmentState_Dropped {
|
||||||
)
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var segmentPosition *internalpb.MsgPosition
|
||||||
|
if s.GetDmlPosition() != nil {
|
||||||
|
segmentPosition = s.GetDmlPosition()
|
||||||
} else {
|
} else {
|
||||||
// use passed start positions, skip to ask RootCoord.
|
segmentPosition = s.GetStartPosition()
|
||||||
seekPosition = toMsgPosition(channel.Name, channel.StartPositions)
|
}
|
||||||
log.Info("segment position not found, setting channel seek position to channel start position",
|
if minPos == nil || segmentPosition.Timestamp < minPos.Timestamp {
|
||||||
zap.String("channel", channel.Name),
|
minPosSegID = s.GetID()
|
||||||
zap.Uint64("position timestamp", seekPosition.GetTimestamp()),
|
minPosTs = segmentPosition.GetTimestamp()
|
||||||
zap.Time("realworld position timestamp", tsoutil.PhysicalTime(seekPosition.GetTimestamp())),
|
minPos = segmentPosition
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return seekPosition
|
if minPos != nil {
|
||||||
|
log.Info("getEarliestSegmentDMLPos done",
|
||||||
|
zap.Int64("segment ID", minPosSegID),
|
||||||
|
zap.Uint64("posTs", minPosTs),
|
||||||
|
zap.Time("posTime", tsoutil.PhysicalTime(minPosTs)))
|
||||||
|
}
|
||||||
|
return minPos
|
||||||
|
}
|
||||||
|
|
||||||
|
// getCollectionStartPos returns collection start position.
|
||||||
|
func (h *ServerHandler) getCollectionStartPos(channel *channel) *internalpb.MsgPosition {
|
||||||
|
// use collection start position when segment position is not found
|
||||||
|
var startPosition *internalpb.MsgPosition
|
||||||
|
if channel.StartPositions == nil {
|
||||||
|
collection, err := h.GetCollection(h.s.ctx, channel.CollectionID)
|
||||||
|
if collection != nil && err == nil {
|
||||||
|
startPosition = getCollectionStartPosition(channel.Name, collection)
|
||||||
|
}
|
||||||
|
log.Info("NEITHER segment position or channel start position are found, setting channel seek position to collection start position",
|
||||||
|
zap.String("channel", channel.Name),
|
||||||
|
zap.Uint64("posTs", startPosition.GetTimestamp()),
|
||||||
|
zap.Time("posTime", tsoutil.PhysicalTime(startPosition.GetTimestamp())),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
// use passed start positions, skip to ask RootCoord.
|
||||||
|
startPosition = toMsgPosition(channel.Name, channel.StartPositions)
|
||||||
|
log.Info("segment position not found, setting channel seek position to channel start position",
|
||||||
|
zap.String("channel", channel.Name),
|
||||||
|
zap.Uint64("posTs", startPosition.GetTimestamp()),
|
||||||
|
zap.Time("posTime", tsoutil.PhysicalTime(startPosition.GetTimestamp())),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
return startPosition
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetChannelSeekPosition gets channel seek position from:
|
||||||
|
// 1. Channel checkpoint meta;
|
||||||
|
// 2. Segments earliest dml position;
|
||||||
|
// 3. Collection start position;
|
||||||
|
// And would return if any position is valid.
|
||||||
|
func (h *ServerHandler) GetChannelSeekPosition(channel *channel, partitionID UniqueID) *internalpb.MsgPosition {
|
||||||
|
var seekPosition *internalpb.MsgPosition
|
||||||
|
seekPosition = h.s.meta.GetChannelCheckpoint(channel.Name)
|
||||||
|
if seekPosition != nil {
|
||||||
|
log.Info("channel seek position set from channel checkpoint meta",
|
||||||
|
zap.String("channel", channel.Name),
|
||||||
|
zap.Uint64("posTs", seekPosition.Timestamp),
|
||||||
|
zap.Time("posTime", tsoutil.PhysicalTime(seekPosition.GetTimestamp())))
|
||||||
|
return seekPosition
|
||||||
|
}
|
||||||
|
|
||||||
|
seekPosition = h.getEarliestSegmentDMLPos(channel, partitionID)
|
||||||
|
if seekPosition != nil {
|
||||||
|
log.Info("channel seek position set from earliest segment dml position",
|
||||||
|
zap.String("channel", channel.Name),
|
||||||
|
zap.Uint64("posTs", seekPosition.Timestamp),
|
||||||
|
zap.Time("posTime", tsoutil.PhysicalTime(seekPosition.GetTimestamp())))
|
||||||
|
return seekPosition
|
||||||
|
}
|
||||||
|
|
||||||
|
seekPosition = h.getCollectionStartPos(channel)
|
||||||
|
if seekPosition != nil {
|
||||||
|
log.Info("channel seek position set from collection start position",
|
||||||
|
zap.String("channel", channel.Name),
|
||||||
|
zap.Uint64("posTs", seekPosition.Timestamp),
|
||||||
|
zap.Time("posTime", tsoutil.PhysicalTime(seekPosition.GetTimestamp())))
|
||||||
|
return seekPosition
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Warn("get channel checkpoint failed, channelCPMeta and earliestSegDMLPos and collStartPos are all invalid",
|
||||||
|
zap.String("channel", channel.Name))
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getCollectionStartPosition(channel string, collectionInfo *collectionInfo) *internalpb.MsgPosition {
|
func getCollectionStartPosition(channel string, collectionInfo *collectionInfo) *internalpb.MsgPosition {
|
||||||
|
|
|
@ -1142,13 +1142,13 @@ func (m *meta) GetCompactionTo(segmentID int64) *SegmentInfo {
|
||||||
|
|
||||||
// UpdateChannelCheckpoint updates and saves channel checkpoint.
|
// UpdateChannelCheckpoint updates and saves channel checkpoint.
|
||||||
func (m *meta) UpdateChannelCheckpoint(vChannel string, pos *internalpb.MsgPosition) error {
|
func (m *meta) UpdateChannelCheckpoint(vChannel string, pos *internalpb.MsgPosition) error {
|
||||||
m.Lock()
|
|
||||||
defer m.Unlock()
|
|
||||||
|
|
||||||
if pos == nil {
|
if pos == nil {
|
||||||
return fmt.Errorf("channelCP is nil, vChannel=%s", vChannel)
|
return fmt.Errorf("channelCP is nil, vChannel=%s", vChannel)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
m.Lock()
|
||||||
|
defer m.Unlock()
|
||||||
|
|
||||||
oldPosition, ok := m.channelCPs[vChannel]
|
oldPosition, ok := m.channelCPs[vChannel]
|
||||||
if !ok || oldPosition.Timestamp < pos.Timestamp {
|
if !ok || oldPosition.Timestamp < pos.Timestamp {
|
||||||
err := m.catalog.SaveChannelCheckpoint(m.ctx, vChannel, pos)
|
err := m.catalog.SaveChannelCheckpoint(m.ctx, vChannel, pos)
|
||||||
|
|
|
@ -1644,61 +1644,98 @@ func TestDataNodeTtChannel(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetChannelCheckpoint(t *testing.T) {
|
func TestGetChannelSeekPosition(t *testing.T) {
|
||||||
svr := newTestServer(t, nil)
|
startPos1 := []*commonpb.KeyDataPair{
|
||||||
defer closeTestServer(t, svr)
|
{
|
||||||
schema := newTestSchema()
|
Key: "ch1",
|
||||||
svr.meta.AddCollection(&collectionInfo{
|
Data: []byte{1, 2, 3},
|
||||||
ID: 0,
|
|
||||||
Schema: schema,
|
|
||||||
StartPositions: []*commonpb.KeyDataPair{
|
|
||||||
{
|
|
||||||
Key: "ch1",
|
|
||||||
Data: []byte{8, 9, 10},
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
})
|
}
|
||||||
svr.meta.AddCollection(&collectionInfo{
|
startPosNonExist := []*commonpb.KeyDataPair{
|
||||||
ID: 1,
|
{
|
||||||
Schema: schema,
|
Key: "ch2",
|
||||||
StartPositions: []*commonpb.KeyDataPair{
|
Data: []byte{4, 5, 6},
|
||||||
{
|
|
||||||
Key: "ch0",
|
|
||||||
Data: []byte{11, 12, 13},
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
})
|
}
|
||||||
|
|
||||||
t.Run("get non-existent channel", func(t *testing.T) {
|
tests := []struct {
|
||||||
channelCP := svr.handler.(*ServerHandler).getChannelCheckpoint(&channel{Name: "chx1", CollectionID: 0})
|
testName string
|
||||||
assert.Nil(t, channelCP)
|
channelCP *internalpb.MsgPosition
|
||||||
})
|
segDMLPos []*internalpb.MsgPosition
|
||||||
|
collStartPos []*commonpb.KeyDataPair
|
||||||
|
channelName string
|
||||||
|
expectedPos *internalpb.MsgPosition
|
||||||
|
}{
|
||||||
|
{"test-with-channelCP",
|
||||||
|
&internalpb.MsgPosition{ChannelName: "ch1", Timestamp: 100},
|
||||||
|
[]*internalpb.MsgPosition{{ChannelName: "ch1", Timestamp: 50}, {ChannelName: "ch1", Timestamp: 200}},
|
||||||
|
startPos1,
|
||||||
|
"ch1", &internalpb.MsgPosition{ChannelName: "ch1", Timestamp: 100}},
|
||||||
|
|
||||||
t.Run("get no channelCP in meta", func(t *testing.T) {
|
{"test-with-segmentDMLPos",
|
||||||
channelCP := svr.handler.(*ServerHandler).getChannelCheckpoint(&channel{Name: "ch1", CollectionID: 0})
|
nil,
|
||||||
assert.NotNil(t, channelCP)
|
[]*internalpb.MsgPosition{{ChannelName: "ch1", Timestamp: 50}, {ChannelName: "ch1", Timestamp: 200}},
|
||||||
assert.EqualValues(t, []byte{8, 9, 10}, channelCP.GetMsgID())
|
startPos1,
|
||||||
channelCP = svr.handler.(*ServerHandler).getChannelCheckpoint(&channel{Name: "ch0", CollectionID: 1})
|
"ch1", &internalpb.MsgPosition{ChannelName: "ch1", Timestamp: 50}},
|
||||||
assert.NotNil(t, channelCP)
|
|
||||||
assert.EqualValues(t, []byte{11, 12, 13}, channelCP.GetMsgID())
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("empty collection", func(t *testing.T) {
|
{"test-with-collStartPos",
|
||||||
channelCP := svr.handler.(*ServerHandler).getChannelCheckpoint(&channel{Name: "ch0_suffix", CollectionID: 2})
|
nil,
|
||||||
assert.Nil(t, channelCP)
|
nil,
|
||||||
})
|
startPos1,
|
||||||
|
"ch1", &internalpb.MsgPosition{ChannelName: "ch1", MsgID: startPos1[0].Data}},
|
||||||
|
|
||||||
t.Run("with channel cp", func(t *testing.T) {
|
{"test-non-exist-channel-1",
|
||||||
err := svr.meta.UpdateChannelCheckpoint("ch1", &internalpb.MsgPosition{
|
nil,
|
||||||
ChannelName: "ch1",
|
nil,
|
||||||
Timestamp: 100,
|
startPosNonExist,
|
||||||
|
"ch1", nil},
|
||||||
|
|
||||||
|
{"test-non-exist-channel-2",
|
||||||
|
nil,
|
||||||
|
nil,
|
||||||
|
nil,
|
||||||
|
"ch1", nil},
|
||||||
|
}
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run(test.testName, func(t *testing.T) {
|
||||||
|
svr := newTestServer(t, nil)
|
||||||
|
defer closeTestServer(t, svr)
|
||||||
|
schema := newTestSchema()
|
||||||
|
if test.collStartPos != nil {
|
||||||
|
svr.meta.AddCollection(&collectionInfo{
|
||||||
|
ID: 0,
|
||||||
|
Schema: schema,
|
||||||
|
StartPositions: test.collStartPos,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
for i, segPos := range test.segDMLPos {
|
||||||
|
seg := &datapb.SegmentInfo{
|
||||||
|
ID: UniqueID(i),
|
||||||
|
CollectionID: 0,
|
||||||
|
PartitionID: 0,
|
||||||
|
DmlPosition: segPos,
|
||||||
|
InsertChannel: "ch1",
|
||||||
|
}
|
||||||
|
err := svr.meta.AddSegment(NewSegmentInfo(seg))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
if test.channelCP != nil {
|
||||||
|
err := svr.meta.UpdateChannelCheckpoint(test.channelCP.ChannelName, test.channelCP)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
seekPos := svr.handler.(*ServerHandler).GetChannelSeekPosition(&channel{
|
||||||
|
Name: test.channelName,
|
||||||
|
CollectionID: 0}, allPartitionID)
|
||||||
|
if test.expectedPos == nil {
|
||||||
|
assert.True(t, seekPos == nil)
|
||||||
|
} else {
|
||||||
|
assert.Equal(t, test.expectedPos.ChannelName, seekPos.ChannelName)
|
||||||
|
assert.Equal(t, test.expectedPos.Timestamp, seekPos.Timestamp)
|
||||||
|
assert.ElementsMatch(t, test.expectedPos.MsgID, seekPos.MsgID)
|
||||||
|
}
|
||||||
})
|
})
|
||||||
assert.NoError(t, err)
|
}
|
||||||
channelCP := svr.handler.(*ServerHandler).getChannelCheckpoint(&channel{Name: "ch1", CollectionID: 1})
|
|
||||||
assert.NotNil(t, channelCP)
|
|
||||||
assert.True(t, channelCP.ChannelName == "ch1")
|
|
||||||
assert.True(t, channelCP.Timestamp == 100)
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetDataVChanPositions(t *testing.T) {
|
func TestGetDataVChanPositions(t *testing.T) {
|
||||||
|
|
|
@ -521,7 +521,7 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, startP
|
||||||
if !loaded {
|
if !loaded {
|
||||||
buffer, err = newBufferData(collSchema)
|
buffer, err = newBufferData(collSchema)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("newBufferData failed, segment=%d, channel=%s, err=%s", currentSegID, ibNode.channelName, err)
|
return fmt.Errorf("newBufferData failed, segment=%d, channel=%s, err=%w", currentSegID, ibNode.channelName, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -58,7 +58,7 @@ func (ttn *ttNode) Name() string {
|
||||||
// Operate handles input messages, implementing flowgraph.Node
|
// Operate handles input messages, implementing flowgraph.Node
|
||||||
func (ttn *ttNode) Operate(in []Msg) []Msg {
|
func (ttn *ttNode) Operate(in []Msg) []Msg {
|
||||||
if in == nil {
|
if in == nil {
|
||||||
log.Debug("type assertion failed for flowGraphMsg because it's nil")
|
log.Warn("type assertion failed for flowGraphMsg because it's nil")
|
||||||
return []Msg{}
|
return []Msg{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -134,7 +134,7 @@ func (s *Segment) evictHistoryInsertBuffer(endPos *internalpb.MsgPosition) {
|
||||||
}
|
}
|
||||||
s.historyInsertBuf = tmpBuffers
|
s.historyInsertBuf = tmpBuffers
|
||||||
ts, _ := tsoutil.ParseTS(endPos.Timestamp)
|
ts, _ := tsoutil.ParseTS(endPos.Timestamp)
|
||||||
log.Debug("evictHistoryInsertBuffer done", zap.Int64("segmentID", s.segmentID), zap.Time("ts", ts), zap.String("channel", endPos.ChannelName))
|
log.Info("evictHistoryInsertBuffer done", zap.Int64("segmentID", s.segmentID), zap.Time("ts", ts), zap.String("channel", endPos.ChannelName))
|
||||||
}
|
}
|
||||||
|
|
||||||
// rollDeleteBuffer moves curDeleteBuf to historyDeleteBuf, and then sets curDeleteBuf to nil.
|
// rollDeleteBuffer moves curDeleteBuf to historyDeleteBuf, and then sets curDeleteBuf to nil.
|
||||||
|
@ -157,7 +157,7 @@ func (s *Segment) evictHistoryDeleteBuffer(endPos *internalpb.MsgPosition) {
|
||||||
}
|
}
|
||||||
s.historyDeleteBuf = tmpBuffers
|
s.historyDeleteBuf = tmpBuffers
|
||||||
ts, _ := tsoutil.ParseTS(endPos.Timestamp)
|
ts, _ := tsoutil.ParseTS(endPos.Timestamp)
|
||||||
log.Debug("evictHistoryDeleteBuffer done", zap.Int64("segmentID", s.segmentID), zap.Time("ts", ts), zap.String("channel", endPos.ChannelName))
|
log.Info("evictHistoryDeleteBuffer done", zap.Int64("segmentID", s.segmentID), zap.Time("ts", ts), zap.String("channel", endPos.ChannelName))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Segment) isBufferEmpty() bool {
|
func (s *Segment) isBufferEmpty() bool {
|
||||||
|
|
|
@ -17,15 +17,9 @@
|
||||||
package datanode
|
package datanode
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/internal/util/tsoutil"
|
"github.com/milvus-io/milvus/internal/util/tsoutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
syncPeriod = 10 * time.Minute // TODO: move to config?
|
|
||||||
)
|
|
||||||
|
|
||||||
// segmentSyncPolicy sync policy applies to segment
|
// segmentSyncPolicy sync policy applies to segment
|
||||||
type segmentSyncPolicy func(segment *Segment, ts Timestamp) bool
|
type segmentSyncPolicy func(segment *Segment, ts Timestamp) bool
|
||||||
|
|
||||||
|
@ -34,6 +28,7 @@ func syncPeriodically() segmentSyncPolicy {
|
||||||
return func(segment *Segment, ts Timestamp) bool {
|
return func(segment *Segment, ts Timestamp) bool {
|
||||||
endTime := tsoutil.PhysicalTime(ts)
|
endTime := tsoutil.PhysicalTime(ts)
|
||||||
lastSyncTime := tsoutil.PhysicalTime(segment.lastSyncTs)
|
lastSyncTime := tsoutil.PhysicalTime(segment.lastSyncTs)
|
||||||
return endTime.Sub(lastSyncTime) >= syncPeriod && !segment.isBufferEmpty()
|
return endTime.Sub(lastSyncTime) >= Params.DataNodeCfg.SyncPeriod &&
|
||||||
|
!segment.isBufferEmpty()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,10 +35,10 @@ func TestSyncPeriodically(t *testing.T) {
|
||||||
isBufferEmpty bool
|
isBufferEmpty bool
|
||||||
shouldSync bool
|
shouldSync bool
|
||||||
}{
|
}{
|
||||||
{"test buffer empty and stale", t0, t0.Add(syncPeriod), true, false},
|
{"test buffer empty and stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod), true, false},
|
||||||
{"test buffer empty and not stale", t0, t0.Add(syncPeriod / 2), true, false},
|
{"test buffer empty and not stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod / 2), true, false},
|
||||||
{"test buffer not empty and stale", t0, t0.Add(syncPeriod), false, true},
|
{"test buffer not empty and stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod), false, true},
|
||||||
{"test buffer not empty and not stale", t0, t0.Add(syncPeriod / 2), false, false},
|
{"test buffer not empty and not stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod / 2), false, false},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
|
|
|
@ -1387,8 +1387,11 @@ type dataNodeConfig struct {
|
||||||
Port int
|
Port int
|
||||||
FlowGraphMaxQueueLength int32
|
FlowGraphMaxQueueLength int32
|
||||||
FlowGraphMaxParallelism int32
|
FlowGraphMaxParallelism int32
|
||||||
FlushInsertBufferSize int64
|
|
||||||
FlushDeleteBufferBytes int64
|
// segment
|
||||||
|
FlushInsertBufferSize int64
|
||||||
|
FlushDeleteBufferBytes int64
|
||||||
|
SyncPeriod time.Duration
|
||||||
|
|
||||||
Alias string // Different datanode in one machine
|
Alias string // Different datanode in one machine
|
||||||
|
|
||||||
|
@ -1409,6 +1412,7 @@ func (p *dataNodeConfig) init(base *BaseTable) {
|
||||||
p.initFlowGraphMaxParallelism()
|
p.initFlowGraphMaxParallelism()
|
||||||
p.initFlushInsertBufferSize()
|
p.initFlushInsertBufferSize()
|
||||||
p.initFlushDeleteBufferSize()
|
p.initFlushDeleteBufferSize()
|
||||||
|
p.initSyncPeriod()
|
||||||
p.initIOConcurrency()
|
p.initIOConcurrency()
|
||||||
|
|
||||||
p.initChannelWatchPath()
|
p.initChannelWatchPath()
|
||||||
|
@ -1428,7 +1432,7 @@ func (p *dataNodeConfig) initFlowGraphMaxParallelism() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *dataNodeConfig) initFlushInsertBufferSize() {
|
func (p *dataNodeConfig) initFlushInsertBufferSize() {
|
||||||
bufferSize := p.Base.LoadWithDefault2([]string{"DATA_NODE_IBUFSIZE", "datanode.flush.insertBufSize"}, "0")
|
bufferSize := p.Base.LoadWithDefault2([]string{"DATA_NODE_IBUFSIZE", "datanode.segment.insertBufSize"}, "0")
|
||||||
bs, err := strconv.ParseInt(bufferSize, 10, 64)
|
bs, err := strconv.ParseInt(bufferSize, 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
@ -1437,11 +1441,16 @@ func (p *dataNodeConfig) initFlushInsertBufferSize() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *dataNodeConfig) initFlushDeleteBufferSize() {
|
func (p *dataNodeConfig) initFlushDeleteBufferSize() {
|
||||||
deleteBufBytes := p.Base.ParseInt64WithDefault("datanode.flush.deleteBufBytes",
|
deleteBufBytes := p.Base.ParseInt64WithDefault("datanode.segment.deleteBufBytes",
|
||||||
64*1024*1024)
|
64*1024*1024)
|
||||||
p.FlushDeleteBufferBytes = deleteBufBytes
|
p.FlushDeleteBufferBytes = deleteBufBytes
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *dataNodeConfig) initSyncPeriod() {
|
||||||
|
syncPeriodInSeconds := p.Base.ParseInt64WithDefault("datanode.segment.syncPeriod", 600)
|
||||||
|
p.SyncPeriod = time.Duration(syncPeriodInSeconds) * time.Second
|
||||||
|
}
|
||||||
|
|
||||||
func (p *dataNodeConfig) initChannelWatchPath() {
|
func (p *dataNodeConfig) initChannelWatchPath() {
|
||||||
p.ChannelWatchSubPath = "channelwatch"
|
p.ChannelWatchSubPath = "channelwatch"
|
||||||
}
|
}
|
||||||
|
|
|
@ -313,6 +313,10 @@ func TestComponentParam(t *testing.T) {
|
||||||
size := Params.FlushInsertBufferSize
|
size := Params.FlushInsertBufferSize
|
||||||
t.Logf("FlushInsertBufferSize: %d", size)
|
t.Logf("FlushInsertBufferSize: %d", size)
|
||||||
|
|
||||||
|
period := Params.SyncPeriod
|
||||||
|
t.Logf("SyncPeriod: %v", period)
|
||||||
|
assert.Equal(t, 10*time.Minute, Params.SyncPeriod)
|
||||||
|
|
||||||
Params.CreatedTime = time.Now()
|
Params.CreatedTime = time.Now()
|
||||||
t.Logf("CreatedTime: %v", Params.CreatedTime)
|
t.Logf("CreatedTime: %v", Params.CreatedTime)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue