mirror of https://github.com/milvus-io/milvus.git
Rename newTimeTickManager to newTimeTickSender (#25415)
Signed-off-by: wayblink <anyang.wang@zilliz.com>pull/25485/head
parent
74c4b28ef1
commit
fc12d3997c
|
@ -543,7 +543,7 @@ func (node *DataNode) Start() error {
|
||||||
go node.compactionExecutor.start(node.ctx)
|
go node.compactionExecutor.start(node.ctx)
|
||||||
|
|
||||||
if Params.DataNodeCfg.DataNodeTimeTickByRPC.GetAsBool() {
|
if Params.DataNodeCfg.DataNodeTimeTickByRPC.GetAsBool() {
|
||||||
node.timeTickSender = newTimeTickManager(node.dataCoord, node.session.ServerID)
|
node.timeTickSender = newTimeTickSender(node.dataCoord, node.session.ServerID)
|
||||||
go node.timeTickSender.start(node.ctx)
|
go node.timeTickSender.start(node.ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -277,7 +277,7 @@ func TestDataSyncService_Start(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
atimeTickSender := newTimeTickManager(dataCoord, 0)
|
atimeTickSender := newTimeTickSender(dataCoord, 0)
|
||||||
sync, err := newDataSyncService(ctx, flushChan, resendTTChan, channel, alloc, dispClient, factory, vchan, signalCh, dataCoord, newCache(), cm, newCompactionExecutor(), genTestTickler(), 0, atimeTickSender)
|
sync, err := newDataSyncService(ctx, flushChan, resendTTChan, channel, alloc, dispClient, factory, vchan, signalCh, dataCoord, newCache(), cm, newCompactionExecutor(), genTestTickler(), 0, atimeTickSender)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
@ -439,7 +439,7 @@ func TestDataSyncService_Close(t *testing.T) {
|
||||||
syncPeriodically(),
|
syncPeriodically(),
|
||||||
syncMemoryTooHigh(),
|
syncMemoryTooHigh(),
|
||||||
}
|
}
|
||||||
atimeTickSender := newTimeTickManager(mockDataCoord, 0)
|
atimeTickSender := newTimeTickSender(mockDataCoord, 0)
|
||||||
syncService, err := newDataSyncService(ctx, flushChan, resendTTChan, channel, alloc, dispClient, factory, vchan, signalCh, mockDataCoord, newCache(), cm, newCompactionExecutor(), genTestTickler(), 0, atimeTickSender)
|
syncService, err := newDataSyncService(ctx, flushChan, resendTTChan, channel, alloc, dispClient, factory, vchan, signalCh, mockDataCoord, newCache(), cm, newCompactionExecutor(), genTestTickler(), 0, atimeTickSender)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
|
|
@ -117,7 +117,7 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
dataCoord := &DataCoordFactory{}
|
dataCoord := &DataCoordFactory{}
|
||||||
atimeTickSender := newTimeTickManager(dataCoord, 0)
|
atimeTickSender := newTimeTickSender(dataCoord, 0)
|
||||||
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c, atimeTickSender)
|
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c, atimeTickSender)
|
||||||
assert.NotNil(t, iBNode)
|
assert.NotNil(t, iBNode)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -211,7 +211,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
dataCoord := &DataCoordFactory{}
|
dataCoord := &DataCoordFactory{}
|
||||||
atimeTickSender := newTimeTickManager(dataCoord, 0)
|
atimeTickSender := newTimeTickSender(dataCoord, 0)
|
||||||
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c, atimeTickSender)
|
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c, atimeTickSender)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -389,7 +389,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
|
||||||
delBufHeap: &PriorityQueue{},
|
delBufHeap: &PriorityQueue{},
|
||||||
}
|
}
|
||||||
dataCoord := &DataCoordFactory{}
|
dataCoord := &DataCoordFactory{}
|
||||||
atimeTickSender := newTimeTickManager(dataCoord, 0)
|
atimeTickSender := newTimeTickSender(dataCoord, 0)
|
||||||
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c, atimeTickSender)
|
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c, atimeTickSender)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -637,7 +637,7 @@ func TestInsertBufferNodeRollBF(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
dataCoord := &DataCoordFactory{}
|
dataCoord := &DataCoordFactory{}
|
||||||
atimeTickSender := newTimeTickManager(dataCoord, 0)
|
atimeTickSender := newTimeTickSender(dataCoord, 0)
|
||||||
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c, atimeTickSender)
|
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c, atimeTickSender)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -1015,7 +1015,7 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
dataCoord := &DataCoordFactory{}
|
dataCoord := &DataCoordFactory{}
|
||||||
atimeTickSender := newTimeTickManager(dataCoord, 0)
|
atimeTickSender := newTimeTickSender(dataCoord, 0)
|
||||||
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c, atimeTickSender)
|
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c, atimeTickSender)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
|
|
@ -49,7 +49,7 @@ type segmentStatesSequence struct {
|
||||||
data map[uint64][]*commonpb.SegmentStats // ts -> segmentStats
|
data map[uint64][]*commonpb.SegmentStats // ts -> segmentStats
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTimeTickManager(dataCoord types.DataCoord, nodeID int64) *timeTickSender {
|
func newTimeTickSender(dataCoord types.DataCoord, nodeID int64) *timeTickSender {
|
||||||
return &timeTickSender{
|
return &timeTickSender{
|
||||||
nodeID: nodeID,
|
nodeID: nodeID,
|
||||||
dataCoord: dataCoord,
|
dataCoord: dataCoord,
|
||||||
|
|
|
@ -32,7 +32,7 @@ import (
|
||||||
|
|
||||||
func TestTimetickManagerNormal(t *testing.T) {
|
func TestTimetickManagerNormal(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
manager := newTimeTickManager(&DataCoordFactory{}, 0)
|
manager := newTimeTickSender(&DataCoordFactory{}, 0)
|
||||||
|
|
||||||
channelName1 := "channel1"
|
channelName1 := "channel1"
|
||||||
ts := uint64(time.Now().Unix())
|
ts := uint64(time.Now().Unix())
|
||||||
|
@ -128,7 +128,7 @@ func TestTimetickManagerNormal(t *testing.T) {
|
||||||
|
|
||||||
func TestTimetickManagerSendErr(t *testing.T) {
|
func TestTimetickManagerSendErr(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
manager := newTimeTickManager(&DataCoordFactory{ReportDataNodeTtMsgsError: true}, 0)
|
manager := newTimeTickSender(&DataCoordFactory{ReportDataNodeTtMsgsError: true}, 0)
|
||||||
|
|
||||||
channelName1 := "channel1"
|
channelName1 := "channel1"
|
||||||
ts := uint64(time.Now().Unix())
|
ts := uint64(time.Now().Unix())
|
||||||
|
@ -147,7 +147,7 @@ func TestTimetickManagerSendErr(t *testing.T) {
|
||||||
|
|
||||||
func TestTimetickManagerSendNotSuccess(t *testing.T) {
|
func TestTimetickManagerSendNotSuccess(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
manager := newTimeTickManager(&DataCoordFactory{ReportDataNodeTtMsgsNotSuccess: true}, 0)
|
manager := newTimeTickSender(&DataCoordFactory{ReportDataNodeTtMsgsNotSuccess: true}, 0)
|
||||||
|
|
||||||
channelName1 := "channel1"
|
channelName1 := "channel1"
|
||||||
ts := uint64(time.Now().Unix())
|
ts := uint64(time.Now().Unix())
|
||||||
|
@ -178,7 +178,7 @@ func TestTimetickManagerSendReport(t *testing.T) {
|
||||||
}).Return(&commonpb.Status{
|
}).Return(&commonpb.Status{
|
||||||
ErrorCode: commonpb.ErrorCode_Success,
|
ErrorCode: commonpb.ErrorCode_Success,
|
||||||
}, nil)
|
}, nil)
|
||||||
manager := newTimeTickManager(mockDataCoord, 0)
|
manager := newTimeTickSender(mockDataCoord, 0)
|
||||||
go manager.start(ctx)
|
go manager.start(ctx)
|
||||||
|
|
||||||
assert.Eventually(t, func() bool {
|
assert.Eventually(t, func() bool {
|
||||||
|
|
Loading…
Reference in New Issue