Fix watch dml channel fail because of no collection meta (#15436)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
pull/15526/head
Xiaofan 2022-02-10 13:19:46 +08:00 committed by GitHub
parent 9e85fa3aed
commit a18d6b46a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 14 additions and 26 deletions

View File

@ -209,7 +209,7 @@ func (colReplica *collectionReplica) addCollection(collectionID UniqueID, schema
var newCollection = newCollection(collectionID, schema)
colReplica.collections[collectionID] = newCollection
log.Debug("Successfully add collection ", zap.Int64("collectionID", collectionID))
return newCollection
}

View File

@ -254,6 +254,10 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
zap.Strings("pChannels", pChannels),
)
// init collection meta
sCol := w.node.streaming.replica.addCollection(collectionID, w.req.Schema)
hCol := w.node.historical.replica.addCollection(collectionID, w.req.Schema)
// load growing segments
unFlushedSegments := make([]*queryPb.SegmentLoadInfo, 0)
unFlushedSegmentIDs := make([]UniqueID, 0)
@ -287,7 +291,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
if err != nil {
return err
}
log.Debug("load growing segments done in WatchDmChannels",
log.Debug("successfully load growing segments done in WatchDmChannels",
zap.Int64("collectionID", collectionID),
zap.Int64s("unFlushedSegmentIDs", unFlushedSegmentIDs),
)
@ -407,10 +411,6 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
log.Debug("watchDMChannel, add flowGraph for dmChannels success", zap.Int64("collectionID", collectionID), zap.Strings("vChannels", vChannels))
// init collection
sCol := w.node.streaming.replica.addCollection(collectionID, w.req.Schema)
hCol := w.node.historical.replica.addCollection(collectionID, w.req.Schema)
sCol.addVChannels(vChannels)
sCol.addPChannels(pChannels)
sCol.setLoadType(lType)
@ -620,27 +620,15 @@ func (l *loadSegmentsTask) Execute(ctx context.Context) error {
for _, info := range l.req.Infos {
collectionID := info.CollectionID
partitionID := info.PartitionID
hasCollectionInHistorical := l.node.historical.replica.hasCollection(collectionID)
hasPartitionInHistorical := l.node.historical.replica.hasPartition(partitionID)
if !hasCollectionInHistorical {
l.node.historical.replica.addCollection(collectionID, l.req.Schema)
l.node.historical.replica.addCollection(collectionID, l.req.Schema)
err = l.node.historical.replica.addPartition(collectionID, partitionID)
if err != nil {
return err
}
if !hasPartitionInHistorical {
err = l.node.historical.replica.addPartition(collectionID, partitionID)
if err != nil {
return err
}
}
hasCollectionInStreaming := l.node.streaming.replica.hasCollection(collectionID)
hasPartitionInStreaming := l.node.streaming.replica.hasPartition(partitionID)
if !hasCollectionInStreaming {
l.node.streaming.replica.addCollection(collectionID, l.req.Schema)
}
if !hasPartitionInStreaming {
err = l.node.streaming.replica.addPartition(collectionID, partitionID)
if err != nil {
return err
}
l.node.streaming.replica.addCollection(collectionID, l.req.Schema)
err = l.node.streaming.replica.addPartition(collectionID, partitionID)
if err != nil {
return err
}
}