mirror of https://github.com/milvus-io/milvus.git
Fix datarace of flow_graph_manager_test.go in package datanode (#23043)
Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>pull/23055/head
parent
d6bfe8fbb9
commit
e3c3c949c4
|
@ -210,32 +210,30 @@ func TestFlowGraphManager(t *testing.T) {
|
||||||
fm.dropAll()
|
fm.dropAll()
|
||||||
const channelPrefix = "by-dev-rootcoord-dml-test-fg-mgr-execute-"
|
const channelPrefix = "by-dev-rootcoord-dml-test-fg-mgr-execute-"
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
t.Run(test.testName, func(t *testing.T) {
|
var baseParams = &Params.BaseTable
|
||||||
var baseParams = &Params.BaseTable
|
baseParams.Save(Params.DataNodeCfg.MemoryWatermark.Key, fmt.Sprintf("%f", test.watermark))
|
||||||
baseParams.Save(Params.DataNodeCfg.MemoryWatermark.Key, fmt.Sprintf("%f", test.watermark))
|
baseParams.Save(Params.DataNodeCfg.MemoryForceSyncEnable.Key, fmt.Sprintf("%t", true))
|
||||||
baseParams.Save(Params.DataNodeCfg.MemoryForceSyncEnable.Key, fmt.Sprintf("%t", true))
|
for i, memorySize := range test.memorySizes {
|
||||||
for i, memorySize := range test.memorySizes {
|
vchannel := fmt.Sprintf("%s%d", channelPrefix, i)
|
||||||
vchannel := fmt.Sprintf("%s%d", channelPrefix, i)
|
vchan := &datapb.VchannelInfo{
|
||||||
vchan := &datapb.VchannelInfo{
|
ChannelName: vchannel,
|
||||||
ChannelName: vchannel,
|
|
||||||
}
|
|
||||||
err = fm.addAndStart(node, vchan, nil, genTestTickler())
|
|
||||||
assert.NoError(t, err)
|
|
||||||
fg, ok := fm.flowgraphs.Load(vchannel)
|
|
||||||
assert.True(t, ok)
|
|
||||||
err = fg.(*dataSyncService).channel.addSegment(addSegmentReq{segID: 0})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
fg.(*dataSyncService).channel.updateSegmentMemorySize(0, memorySize)
|
|
||||||
fg.(*dataSyncService).channel.(*ChannelMeta).needToSync.Store(false)
|
|
||||||
}
|
}
|
||||||
fm.execute(test.totalMemory)
|
err = fm.addAndStart(node, vchan, nil, genTestTickler())
|
||||||
for i, needToSync := range test.expectNeedToSync {
|
assert.NoError(t, err)
|
||||||
vchannel := fmt.Sprintf("%s%d", channelPrefix, i)
|
fg, ok := fm.flowgraphs.Load(vchannel)
|
||||||
fg, ok := fm.flowgraphs.Load(vchannel)
|
assert.True(t, ok)
|
||||||
assert.True(t, ok)
|
err = fg.(*dataSyncService).channel.addSegment(addSegmentReq{segID: 0})
|
||||||
assert.Equal(t, needToSync, fg.(*dataSyncService).channel.(*ChannelMeta).needToSync.Load())
|
assert.NoError(t, err)
|
||||||
}
|
fg.(*dataSyncService).channel.updateSegmentMemorySize(0, memorySize)
|
||||||
})
|
fg.(*dataSyncService).channel.(*ChannelMeta).needToSync.Store(false)
|
||||||
|
}
|
||||||
|
fm.execute(test.totalMemory)
|
||||||
|
for i, needToSync := range test.expectNeedToSync {
|
||||||
|
vchannel := fmt.Sprintf("%s%d", channelPrefix, i)
|
||||||
|
fg, ok := fm.flowgraphs.Load(vchannel)
|
||||||
|
assert.True(t, ok)
|
||||||
|
assert.Equal(t, needToSync, fg.(*dataSyncService).channel.(*ChannelMeta).needToSync.Load())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue