fix: Cleanup write buffer when flowgraph released (#31376)

See also #30137

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/31437/head
congqixia 2024-03-19 01:33:05 +08:00 committed by GitHub
parent 8c43c5b6cb
commit d9efea2fea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 372 additions and 8 deletions

View File

@ -22,8 +22,11 @@ import (
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
@ -41,6 +44,7 @@ type ChannelManagerSuite struct {
func (s *ChannelManagerSuite) SetupTest() {
ctx := context.Background()
s.node = newIDLEDataNodeMock(ctx, schemapb.DataType_Int64)
s.node.allocator = allocator.NewMockAllocator(s.T())
s.manager = NewChannelManager(s.node)
}
@ -52,6 +56,26 @@ func getWatchInfoByOpID(opID UniqueID, channel string, state datapb.ChannelWatch
CollectionID: 1,
ChannelName: channel,
},
Schema: &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
},
}
}

View File

@ -318,6 +318,7 @@ func (node *DataNode) handleChannelEvt(evt *clientv3.Event) {
func (node *DataNode) tryToReleaseFlowgraph(vChanName string) {
log.Info("try to release flowgraph", zap.String("vChanName", vChanName))
node.flowgraphManager.RemoveFlowgraph(vChanName)
node.writeBufferManager.RemoveChannel(vChanName)
}
// BackGroundGC runs in background to release datanode resources

View File

@ -37,6 +37,7 @@ import (
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/importutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
@ -206,7 +207,28 @@ func TestDataNode(t *testing.T) {
}
for _, test := range testDataSyncs {
err = node.flowgraphManager.AddandStartWithEtcdTickler(node, &datapb.VchannelInfo{CollectionID: 1, ChannelName: test.dmChannelName}, nil, genTestTickler())
err = node.flowgraphManager.AddandStartWithEtcdTickler(node, &datapb.VchannelInfo{
CollectionID: 1, ChannelName: test.dmChannelName,
}, &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
}, genTestTickler())
assert.NoError(t, err)
vchanNameCh <- test.dmChannelName
}

View File

@ -350,7 +350,11 @@ func getServiceWithChannel(initCtx context.Context, node *DataNode, info *datapb
resendTTCh = make(chan resendTTMsg, 100)
)
node.writeBufferManager.Register(channelName, metacache, storageV2Cache, writebuffer.WithMetaWriter(syncmgr.BrokerMetaWriter(node.broker, config.serverID)), writebuffer.WithIDAllocator(node.allocator))
err := node.writeBufferManager.Register(channelName, metacache, storageV2Cache, writebuffer.WithMetaWriter(syncmgr.BrokerMetaWriter(node.broker, config.serverID)), writebuffer.WithIDAllocator(node.allocator))
if err != nil {
log.Warn("failed to register channel buffer", zap.Error(err))
return nil, err
}
ctx, cancel := context.WithCancel(node.ctx)
ds := &dataSyncService{
ctx: ctx,

View File

@ -61,6 +61,26 @@ func init() {
func getWatchInfo(info *testInfo) *datapb.ChannelWatchInfo {
return &datapb.ChannelWatchInfo{
Vchan: getVchanInfo(info),
Schema: &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
},
}
}
@ -157,10 +177,12 @@ func TestDataSyncService_newDataSyncService(t *testing.T) {
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
node := newIDLEDataNodeMock(ctx, schemapb.DataType_Int64)
node.allocator = allocator.NewMockAllocator(t)
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
node.factory = test.inMsgFactory
defer node.tryToReleaseFlowgraph(test.chanName)
ds, err := newServiceWithEtcdTickler(
ctx,
node,
@ -183,6 +205,39 @@ func TestDataSyncService_newDataSyncService(t *testing.T) {
}
}
func TestDataSyncService_newDataSyncService_DuplicatedChannel(t *testing.T) {
ctx := context.Background()
test := &testInfo{
true, false, &mockMsgStreamFactory{true, true},
1, "by-dev-rootcoord-dml-test_v1",
1, 1, "by-dev-rootcoord-dml-test_v1", 0,
1, 2, "by-dev-rootcoord-dml-test_v1", 0,
"add un-flushed and flushed segments",
}
cm := storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir))
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
watchInfo := getWatchInfo(test)
node := newIDLEDataNodeMock(ctx, schemapb.DataType_Int64)
node.allocator = allocator.NewMockAllocator(t)
node.factory = test.inMsgFactory
metacache := metacache.NewMockMetaCache(t)
metacache.EXPECT().Collection().Return(test.collID)
metacache.EXPECT().Schema().Return(watchInfo.GetSchema())
node.writeBufferManager.Register(test.chanName, metacache, nil, writebuffer.WithIDAllocator(allocator.NewMockAllocator(t)))
ds, err := newServiceWithEtcdTickler(
ctx,
node,
watchInfo,
genTestTickler(),
)
assert.Error(t, err)
assert.Nil(t, ds)
}
func genBytes() (rawData []byte) {
const DIM = 2
const N = 1

View File

@ -30,10 +30,12 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/broker"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
@ -92,6 +94,26 @@ func TestWatchChannel(t *testing.T) {
info := &datapb.ChannelWatchInfo{
State: datapb.ChannelWatchState_ToWatch,
Vchan: vchan,
Schema: &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
},
}
val, err := proto.Marshal(info)
assert.NoError(t, err)
@ -162,6 +184,26 @@ func TestWatchChannel(t *testing.T) {
info := &datapb.ChannelWatchInfo{
State: datapb.ChannelWatchState_ToRelease,
Vchan: vchan,
Schema: &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
},
}
val, err := proto.Marshal(info)
assert.NoError(t, err)
@ -195,6 +237,26 @@ func TestWatchChannel(t *testing.T) {
info := datapb.ChannelWatchInfo{
Vchan: nil,
State: datapb.ChannelWatchState_Uncomplete,
Schema: &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
},
}
bs, err := proto.Marshal(&info)
assert.NoError(t, err)
@ -225,6 +287,26 @@ func TestWatchChannel(t *testing.T) {
info = datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{ChannelName: ch},
State: datapb.ChannelWatchState_Uncomplete,
Schema: &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
},
}
bs, err = proto.Marshal(&info)
assert.NoError(t, err)
@ -268,6 +350,26 @@ func TestWatchChannel(t *testing.T) {
info := datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{ChannelName: ch},
State: datapb.ChannelWatchState_Uncomplete,
Schema: &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
},
}
bs, err := proto.Marshal(&info)
assert.NoError(t, err)
@ -289,6 +391,26 @@ func TestWatchChannel(t *testing.T) {
UnflushedSegmentIds: []int64{1},
},
State: datapb.ChannelWatchState_Uncomplete,
Schema: &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
},
}
bs, err := proto.Marshal(&info)
assert.NoError(t, err)
@ -449,6 +571,26 @@ func TestEventTickler(t *testing.T) {
Vchan: &datapb.VchannelInfo{
ChannelName: channelName,
},
Schema: &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
},
}, kv, 100*time.Millisecond)
defer tickler.stop()
endCh := make(chan struct{}, 1)

View File

@ -24,10 +24,12 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/merr"
)
@ -83,7 +85,26 @@ func TestFlowGraphManager(t *testing.T) {
}
require.False(t, fm.HasFlowgraph(vchanName))
err := fm.AddandStartWithEtcdTickler(node, vchan, nil, genTestTickler())
err := fm.AddandStartWithEtcdTickler(node, vchan, &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
}, genTestTickler())
assert.NoError(t, err)
assert.True(t, fm.HasFlowgraph(vchanName))
@ -98,7 +119,26 @@ func TestFlowGraphManager(t *testing.T) {
}
require.False(t, fm.HasFlowgraph(vchanName))
err := fm.AddandStartWithEtcdTickler(node, vchan, nil, genTestTickler())
err := fm.AddandStartWithEtcdTickler(node, vchan, &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
}, genTestTickler())
assert.NoError(t, err)
assert.True(t, fm.HasFlowgraph(vchanName))

View File

@ -541,14 +541,52 @@ func (s *DataNodeServicesSuite) TestImport() {
ChannelName: chName1,
UnflushedSegmentIds: []int64{},
FlushedSegmentIds: []int64{},
}, nil, genTestTickler())
}, &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
}, genTestTickler())
s.Require().Nil(err)
err = s.node.flowgraphManager.AddandStartWithEtcdTickler(s.node, &datapb.VchannelInfo{
CollectionID: 100,
ChannelName: chName2,
UnflushedSegmentIds: []int64{},
FlushedSegmentIds: []int64{},
}, nil, genTestTickler())
}, &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
}, genTestTickler())
s.Require().Nil(err)
_, ok := s.node.flowgraphManager.GetFlowgraphService(chName1)
@ -604,14 +642,52 @@ func (s *DataNodeServicesSuite) TestImport() {
ChannelName: chName1,
UnflushedSegmentIds: []int64{},
FlushedSegmentIds: []int64{},
}, nil, genTestTickler())
}, &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
}, genTestTickler())
s.Require().Nil(err)
err = s.node.flowgraphManager.AddandStartWithEtcdTickler(s.node, &datapb.VchannelInfo{
CollectionID: 999, // wrong collection ID.
ChannelName: chName2,
UnflushedSegmentIds: []int64{},
FlushedSegmentIds: []int64{},
}, nil, genTestTickler())
}, &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
}, genTestTickler())
s.Require().Nil(err)
_, ok := s.node.flowgraphManager.GetFlowgraphService(chName1)