mirror of https://github.com/milvus-io/milvus.git
Add mergedTimetickSender for datanode (#11297)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/11316/head
parent
d81b15163f
commit
0e5931a3da
|
@ -68,6 +68,7 @@ type insertBufferNode struct {
|
|||
timeTickStream msgstream.MsgStream
|
||||
segmentStatisticsStream msgstream.MsgStream
|
||||
ttLogger timeTickLogger
|
||||
ttMerger *mergedTimeTickerSender
|
||||
}
|
||||
|
||||
type timeTickLogger struct {
|
||||
|
@ -154,6 +155,8 @@ func (ibNode *insertBufferNode) Name() string {
|
|||
}
|
||||
|
||||
func (ibNode *insertBufferNode) Close() {
|
||||
ibNode.ttMerger.close()
|
||||
|
||||
if ibNode.timeTickStream != nil {
|
||||
ibNode.timeTickStream.Close()
|
||||
}
|
||||
|
@ -667,25 +670,8 @@ func readBinary(reader io.Reader, receiver interface{}, dataType schemapb.DataTy
|
|||
// writeHardTimeTick writes timetick once insertBufferNode operates.
|
||||
func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error {
|
||||
ibNode.ttLogger.LogTs(ts)
|
||||
msgPack := msgstream.MsgPack{}
|
||||
timeTickMsg := msgstream.DataNodeTtMsg{
|
||||
BaseMsg: msgstream.BaseMsg{
|
||||
BeginTimestamp: ts,
|
||||
EndTimestamp: ts,
|
||||
HashValues: []uint32{0},
|
||||
},
|
||||
DataNodeTtMsg: datapb.DataNodeTtMsg{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_DataNodeTt,
|
||||
MsgID: 0,
|
||||
Timestamp: ts,
|
||||
},
|
||||
ChannelName: ibNode.channelName,
|
||||
Timestamp: ts,
|
||||
},
|
||||
}
|
||||
msgPack.Msgs = append(msgPack.Msgs, &timeTickMsg)
|
||||
return ibNode.timeTickStream.Produce(&msgPack)
|
||||
ibNode.ttMerger.bufferTs(ts)
|
||||
return nil
|
||||
}
|
||||
|
||||
// uploadMemStates2Coord uploads latest changed segments statistics in DataNode memory to DataCoord
|
||||
|
@ -782,6 +768,28 @@ func newInsertBufferNode(ctx context.Context, flushCh <-chan flushMsg, fm flushM
|
|||
var segStatisticsMsgStream msgstream.MsgStream = segS
|
||||
segStatisticsMsgStream.Start()
|
||||
|
||||
mt := newMergedTimeTickerSender(func(ts Timestamp) error {
|
||||
msgPack := msgstream.MsgPack{}
|
||||
timeTickMsg := msgstream.DataNodeTtMsg{
|
||||
BaseMsg: msgstream.BaseMsg{
|
||||
BeginTimestamp: ts,
|
||||
EndTimestamp: ts,
|
||||
HashValues: []uint32{0},
|
||||
},
|
||||
DataNodeTtMsg: datapb.DataNodeTtMsg{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_DataNodeTt,
|
||||
MsgID: 0,
|
||||
Timestamp: ts,
|
||||
},
|
||||
ChannelName: config.vChannelName,
|
||||
Timestamp: ts,
|
||||
},
|
||||
}
|
||||
msgPack.Msgs = append(msgPack.Msgs, &timeTickMsg)
|
||||
return wTtMsgStream.Produce(&msgPack)
|
||||
})
|
||||
|
||||
return &insertBufferNode{
|
||||
BaseNode: baseNode,
|
||||
insertBuffer: sync.Map{},
|
||||
|
@ -797,5 +805,6 @@ func newInsertBufferNode(ctx context.Context, flushCh <-chan flushMsg, fm flushM
|
|||
replica: config.replica,
|
||||
idAllocator: config.allocator,
|
||||
channelName: config.vChannelName,
|
||||
ttMerger: mt,
|
||||
}, nil
|
||||
}
|
||||
|
|
|
@ -143,7 +143,9 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
|
|||
|
||||
for _, test := range invalidInTests {
|
||||
te.Run(test.description, func(t0 *testing.T) {
|
||||
ibn := &insertBufferNode{}
|
||||
ibn := &insertBufferNode{
|
||||
ttMerger: newMergedTimeTickerSender(func(Timestamp) error { return nil }),
|
||||
}
|
||||
rt := ibn.Operate(test.in)
|
||||
assert.Empty(t0, rt)
|
||||
})
|
||||
|
|
|
@ -0,0 +1,112 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package datanode
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.uber.org/atomic"
|
||||
)
|
||||
|
||||
type sendTimeTick func(Timestamp) error
|
||||
|
||||
// mergedTimeTickerSender reduces time ticker sending rate when datanode is doing `fast-forwarding`
|
||||
// it makes sure time ticker send at most 10 times a second (1tick/100millisecond)
|
||||
// and the last time tick is always sent
|
||||
type mergedTimeTickerSender struct {
|
||||
ts atomic.Uint64 // current ts value
|
||||
cond *sync.Cond // condition to send timeticker
|
||||
send sendTimeTick // actual sender logic
|
||||
|
||||
lastSent time.Time
|
||||
lastMut sync.RWMutex
|
||||
wg sync.WaitGroup
|
||||
closeCh chan struct{}
|
||||
closeOnce sync.Once
|
||||
}
|
||||
|
||||
func newMergedTimeTickerSender(send sendTimeTick) *mergedTimeTickerSender {
|
||||
mt := &mergedTimeTickerSender{
|
||||
cond: sync.NewCond(&sync.Mutex{}),
|
||||
send: send,
|
||||
closeCh: make(chan struct{}),
|
||||
}
|
||||
mt.ts.Store(0) // 0 for not tt send
|
||||
|
||||
mt.wg.Add(2)
|
||||
go mt.tick()
|
||||
go mt.work()
|
||||
|
||||
return mt
|
||||
}
|
||||
|
||||
func (mt *mergedTimeTickerSender) bufferTs(ts Timestamp) {
|
||||
mt.ts.Store(ts)
|
||||
mt.lastMut.RLock()
|
||||
defer mt.lastMut.RUnlock()
|
||||
|
||||
if !mt.lastSent.IsZero() && time.Since(mt.lastSent) > time.Millisecond*100 {
|
||||
mt.cond.Signal()
|
||||
}
|
||||
}
|
||||
|
||||
func (mt *mergedTimeTickerSender) tick() {
|
||||
defer mt.wg.Done()
|
||||
// this duration might be configuable in the future
|
||||
t := time.Tick(time.Millisecond * 100) // 100 millisecond, 1/2 of rootcoord timetick duration
|
||||
for {
|
||||
select {
|
||||
case <-t:
|
||||
mt.cond.Signal() // allow worker to check every 0.1s
|
||||
case <-mt.closeCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (mt *mergedTimeTickerSender) work() {
|
||||
defer mt.wg.Done()
|
||||
ts, lastTs := uint64(0), uint64(0)
|
||||
for {
|
||||
select {
|
||||
case <-mt.closeCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
mt.cond.L.Lock()
|
||||
mt.cond.Wait()
|
||||
ts = mt.ts.Load()
|
||||
mt.cond.L.Unlock()
|
||||
if ts != lastTs {
|
||||
mt.send(ts)
|
||||
lastTs = ts
|
||||
mt.lastMut.Lock()
|
||||
mt.lastSent = time.Now()
|
||||
mt.lastMut.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (mt *mergedTimeTickerSender) close() {
|
||||
mt.closeOnce.Do(func() {
|
||||
close(mt.closeCh)
|
||||
mt.cond.Broadcast()
|
||||
mt.wg.Wait()
|
||||
})
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
package datanode
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestMergedTimeTicker(t *testing.T) {
|
||||
var ticks []uint64
|
||||
var mut sync.Mutex
|
||||
|
||||
mt := newMergedTimeTickerSender(func(ts Timestamp) error {
|
||||
mut.Lock()
|
||||
defer mut.Unlock()
|
||||
ticks = append(ticks, ts)
|
||||
return nil
|
||||
})
|
||||
|
||||
for i := 1; i < 100; i++ {
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
mt.bufferTs(uint64(i))
|
||||
}
|
||||
mt.close()
|
||||
mut.Lock()
|
||||
assert.EqualValues(t, 99, ticks[len(ticks)-1])
|
||||
assert.Less(t, len(ticks), 20)
|
||||
mut.Unlock()
|
||||
}
|
Loading…
Reference in New Issue