mirror of https://github.com/milvus-io/milvus.git
Add collection delta channel in mutex protection (#20736)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/20721/head
parent
a8c7199a52
commit
e6c8f9cdee
|
@ -96,21 +96,30 @@ func (dsService *dataSyncService) addFlowGraphsForDMLChannels(collectionID Uniqu
|
|||
}
|
||||
|
||||
// addFlowGraphsForDeltaChannels add flowGraphs to deltaChannel2FlowGraph
|
||||
func (dsService *dataSyncService) addFlowGraphsForDeltaChannels(collectionID UniqueID, deltaChannels []string) (map[string]*queryNodeFlowGraph, error) {
|
||||
func (dsService *dataSyncService) addFlowGraphsForDeltaChannels(collectionID UniqueID, deltaChannels []string, VPDeltaChannels map[string]string) (map[string]*queryNodeFlowGraph, error) {
|
||||
log := log.With(
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Strings("deltaChannels", deltaChannels),
|
||||
)
|
||||
|
||||
dsService.mu.Lock()
|
||||
defer dsService.mu.Unlock()
|
||||
|
||||
_, err := dsService.metaReplica.getCollectionByID(collectionID)
|
||||
coll, err := dsService.metaReplica.getCollectionByID(collectionID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// filter out duplicated channels
|
||||
vDeltaChannels := coll.AddVDeltaChannels(deltaChannels, VPDeltaChannels)
|
||||
if len(vDeltaChannels) == 0 {
|
||||
return map[string]*queryNodeFlowGraph{}, nil
|
||||
}
|
||||
|
||||
results := make(map[string]*queryNodeFlowGraph)
|
||||
for _, channel := range deltaChannels {
|
||||
for _, channel := range vDeltaChannels {
|
||||
if _, ok := dsService.deltaChannel2FlowGraph[channel]; ok {
|
||||
log.Warn("delta flow graph has been existed",
|
||||
zap.Any("collectionID", collectionID),
|
||||
zap.Any("channel", channel),
|
||||
zap.String("channel", channel),
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
@ -121,8 +130,9 @@ func (dsService *dataSyncService) addFlowGraphsForDeltaChannels(collectionID Uni
|
|||
channel,
|
||||
dsService.msFactory)
|
||||
if err != nil {
|
||||
for _, fg := range results {
|
||||
for channel, fg := range results {
|
||||
fg.flowGraph.Close()
|
||||
coll.removeVDeltaChannel(channel)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
@ -132,8 +142,8 @@ func (dsService *dataSyncService) addFlowGraphsForDeltaChannels(collectionID Uni
|
|||
for channel, fg := range results {
|
||||
dsService.deltaChannel2FlowGraph[channel] = fg
|
||||
log.Info("add delta flow graph",
|
||||
zap.Any("collectionID", collectionID),
|
||||
zap.Any("channel", channel))
|
||||
zap.String("channel", channel),
|
||||
)
|
||||
metrics.QueryNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Inc()
|
||||
}
|
||||
|
||||
|
@ -268,11 +278,12 @@ func (dsService *dataSyncService) removeEmptyFlowGraphByChannel(collectionID int
|
|||
|
||||
// start to release flow graph
|
||||
log.Info("all segments released, start to remove deltaChannel flowgraph")
|
||||
fg.close()
|
||||
delete(dsService.deltaChannel2FlowGraph, dc)
|
||||
dsService.metaReplica.removeCollectionVDeltaChannel(collectionID, dc)
|
||||
// close flowgraph first, so no write will be dispatched to tSafeReplica
|
||||
fg.close()
|
||||
dsService.tSafeReplica.removeTSafe(dc)
|
||||
// try best to remove, it's ok if all info is gone before this call
|
||||
dsService.metaReplica.removeCollectionVDeltaChannel(collectionID, dc)
|
||||
rateCol.removeTSafeChannel(dc)
|
||||
}
|
||||
|
||||
|
|
|
@ -105,11 +105,11 @@ func TestDataSyncService_DeltaFlowGraphs(t *testing.T) {
|
|||
assert.NotNil(t, dataSyncService)
|
||||
|
||||
t.Run("test DeltaFlowGraphs", func(t *testing.T) {
|
||||
_, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDeltaChannel})
|
||||
_, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDeltaChannel}, map[string]string{defaultDeltaChannel: defaultDeltaChannel})
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, dataSyncService.deltaChannel2FlowGraph, 1)
|
||||
|
||||
_, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDeltaChannel})
|
||||
_, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDeltaChannel}, map[string]string{defaultDeltaChannel: defaultDeltaChannel})
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, dataSyncService.deltaChannel2FlowGraph, 1)
|
||||
|
||||
|
@ -127,7 +127,7 @@ func TestDataSyncService_DeltaFlowGraphs(t *testing.T) {
|
|||
assert.Nil(t, fg)
|
||||
assert.Error(t, err)
|
||||
|
||||
_, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDMLChannel})
|
||||
_, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDMLChannel}, map[string]string{defaultDMLChannel: defaultDMLChannel})
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, dataSyncService.deltaChannel2FlowGraph, 1)
|
||||
|
||||
|
@ -147,7 +147,7 @@ func TestDataSyncService_DeltaFlowGraphs(t *testing.T) {
|
|||
t.Run("test addFlowGraphsForDeltaChannels checkReplica Failed", func(t *testing.T) {
|
||||
err = dataSyncService.metaReplica.removeCollection(defaultCollectionID)
|
||||
assert.NoError(t, err)
|
||||
_, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDMLChannel})
|
||||
_, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDMLChannel}, map[string]string{defaultDMLChannel: defaultDMLChannel})
|
||||
assert.Error(t, err)
|
||||
dataSyncService.metaReplica.addCollection(defaultCollectionID, genTestCollectionSchema())
|
||||
})
|
||||
|
@ -201,7 +201,7 @@ func (s *DataSyncServiceSuite) TestRemoveEmptyFlowgraphByChannel() {
|
|||
err = s.dsService.metaReplica.addSegment(defaultSegmentID, defaultPartitionID, defaultCollectionID, channelName, defaultSegmentVersion, defaultSegmentStartPosition, segmentTypeSealed)
|
||||
s.Require().NoError(err)
|
||||
|
||||
_, err = s.dsService.addFlowGraphsForDeltaChannels(defaultCollectionID, []string{deltaChannelName})
|
||||
_, err = s.dsService.addFlowGraphsForDeltaChannels(defaultCollectionID, []string{deltaChannelName}, map[string]string{deltaChannelName: deltaChannelName})
|
||||
s.Require().NoError(err)
|
||||
|
||||
s.Assert().NotPanics(func() {
|
||||
|
@ -217,7 +217,7 @@ func (s *DataSyncServiceSuite) TestRemoveEmptyFlowgraphByChannel() {
|
|||
deltaChannelName, err := funcutil.ConvertChannelName(channelName, Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta)
|
||||
s.Require().NoError(err)
|
||||
|
||||
_, err = s.dsService.addFlowGraphsForDeltaChannels(defaultCollectionID, []string{deltaChannelName})
|
||||
_, err = s.dsService.addFlowGraphsForDeltaChannels(defaultCollectionID, []string{deltaChannelName}, map[string]string{deltaChannelName: deltaChannelName})
|
||||
s.Require().NoError(err)
|
||||
|
||||
s.Assert().NotPanics(func() {
|
||||
|
|
|
@ -134,14 +134,20 @@ func (l *loadSegmentsTask) Execute(ctx context.Context) error {
|
|||
}
|
||||
|
||||
// internal helper function to subscribe delta channel
|
||||
func (l *loadSegmentsTask) watchDeltaChannel(vchanName []string) error {
|
||||
func (l *loadSegmentsTask) watchDeltaChannel(deltaChannels []string) error {
|
||||
collectionID := l.req.CollectionID
|
||||
log := log.With(
|
||||
zap.Int64("collectionID", collectionID),
|
||||
)
|
||||
var vDeltaChannels []string
|
||||
VPDeltaChannels := make(map[string]string)
|
||||
for _, v := range vchanName {
|
||||
for _, v := range deltaChannels {
|
||||
dc, err := funcutil.ConvertChannelName(v, Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta)
|
||||
if err != nil {
|
||||
log.Warn("watchDeltaChannels, failed to convert deltaChannel from dmlChannel", zap.String("DmlChannel", v), zap.Error(err))
|
||||
log.Warn("watchDeltaChannels, failed to convert deltaChannel from dmlChannel",
|
||||
zap.String("DmlChannel", v),
|
||||
zap.Error(err),
|
||||
)
|
||||
return err
|
||||
}
|
||||
p := funcutil.ToPhysicalChannel(dc)
|
||||
|
@ -149,8 +155,7 @@ func (l *loadSegmentsTask) watchDeltaChannel(vchanName []string) error {
|
|||
VPDeltaChannels[dc] = p
|
||||
}
|
||||
log.Info("Starting WatchDeltaChannels ...",
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Any("channels", VPDeltaChannels),
|
||||
zap.Strings("channels", vDeltaChannels),
|
||||
)
|
||||
|
||||
coll, err := l.node.metaReplica.getCollectionByID(collectionID)
|
||||
|
@ -158,8 +163,25 @@ func (l *loadSegmentsTask) watchDeltaChannel(vchanName []string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// filter out duplicated channels
|
||||
vDeltaChannels = coll.AddVDeltaChannels(vDeltaChannels, VPDeltaChannels)
|
||||
// add collection meta and fg with mutex protection.
|
||||
channel2FlowGraph, err := l.node.dataSyncService.addFlowGraphsForDeltaChannels(collectionID, vDeltaChannels, VPDeltaChannels)
|
||||
if err != nil {
|
||||
log.Warn("watchDeltaChannel, add flowGraph for deltaChannel failed", zap.Int64("collectionID", collectionID), zap.Strings("vDeltaChannels", vDeltaChannels), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
if len(channel2FlowGraph) == 0 {
|
||||
log.Warn("all delta channels have been added before",
|
||||
zap.Strings("deltaChannels", deltaChannels),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
// use valid channel names only.
|
||||
vDeltaChannels = make([]string, 0, len(channel2FlowGraph))
|
||||
for ch := range channel2FlowGraph {
|
||||
vDeltaChannels = append(vDeltaChannels, ch)
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
for _, vDeltaChannel := range vDeltaChannels {
|
||||
|
@ -167,17 +189,6 @@ func (l *loadSegmentsTask) watchDeltaChannel(vchanName []string) error {
|
|||
}
|
||||
}
|
||||
}()
|
||||
|
||||
if len(vDeltaChannels) == 0 {
|
||||
log.Warn("all delta channels has be added before, ignore watch delta requests")
|
||||
return nil
|
||||
}
|
||||
|
||||
channel2FlowGraph, err := l.node.dataSyncService.addFlowGraphsForDeltaChannels(collectionID, vDeltaChannels)
|
||||
if err != nil {
|
||||
log.Warn("watchDeltaChannel, add flowGraph for deltaChannel failed", zap.Int64("collectionID", collectionID), zap.Strings("vDeltaChannels", vDeltaChannels), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
consumeSubName := funcutil.GenChannelSubName(Params.CommonCfg.QueryNodeSubName, collectionID, paramtable.GetNodeID())
|
||||
|
||||
// channels as consumer
|
||||
|
|
Loading…
Reference in New Issue