Clean flowGraph if watchChannel failed (#15303)

Signed-off-by: xige-16 <xi.ge@zilliz.com>
pull/15310/head
xige-16 2022-01-20 10:01:38 +08:00 committed by GitHub
parent 720ec7e0a5
commit c132302129
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 220 additions and 261 deletions

View File

@ -53,7 +53,7 @@ type ReplicaInterface interface {
// getCollectionIDs returns all collection ids in the collectionReplica
getCollectionIDs() []UniqueID
// addCollection creates a new collection and add it to collectionReplica
addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error
addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) *Collection
// removeCollection removes the collection from collectionReplica
removeCollection(collectionID UniqueID) error
// getCollectionByID gets the collection which id is collectionID
@ -199,18 +199,18 @@ func (colReplica *collectionReplica) getCollectionIDs() []UniqueID {
}
// addCollection creates a new collection and add it to collectionReplica
func (colReplica *collectionReplica) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error {
func (colReplica *collectionReplica) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) *Collection {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
if ok := colReplica.hasCollectionPrivate(collectionID); ok {
return fmt.Errorf("collection has been loaded, id %d", collectionID)
if col, ok := colReplica.collections[collectionID]; ok {
return col
}
var newCollection = newCollection(collectionID, schema)
colReplica.collections[collectionID] = newCollection
return nil
return newCollection
}
// removeCollection removes the collection from collectionReplica

View File

@ -42,16 +42,18 @@ type dataSyncService struct {
}
// addFlowGraphsForDMLChannels add flowGraphs to dmlChannel2FlowGraph
func (dsService *dataSyncService) addFlowGraphsForDMLChannels(collectionID UniqueID, dmlChannels []string) {
func (dsService *dataSyncService) addFlowGraphsForDMLChannels(collectionID UniqueID, dmlChannels []string) map[string]*queryNodeFlowGraph {
dsService.mu.Lock()
defer dsService.mu.Unlock()
results := make(map[string]*queryNodeFlowGraph)
for _, channel := range dmlChannels {
if _, ok := dsService.dmlChannel2FlowGraph[channel]; ok {
if fg, ok := dsService.dmlChannel2FlowGraph[channel]; ok {
log.Warn("dml flow graph has been existed",
zap.Any("collectionID", collectionID),
zap.Any("channel", channel),
)
results[channel] = fg
continue
}
newFlowGraph := newQueryNodeFlowGraph(dsService.ctx,
@ -64,20 +66,25 @@ func (dsService *dataSyncService) addFlowGraphsForDMLChannels(collectionID Uniqu
log.Debug("add DML flow graph",
zap.Any("collectionID", collectionID),
zap.Any("channel", channel))
results[channel] = newFlowGraph
}
return results
}
// addFlowGraphsForDeltaChannels add flowGraphs to deltaChannel2FlowGraph
func (dsService *dataSyncService) addFlowGraphsForDeltaChannels(collectionID UniqueID, deltaChannels []string) {
func (dsService *dataSyncService) addFlowGraphsForDeltaChannels(collectionID UniqueID, deltaChannels []string) map[string]*queryNodeFlowGraph {
dsService.mu.Lock()
defer dsService.mu.Unlock()
results := make(map[string]*queryNodeFlowGraph)
for _, channel := range deltaChannels {
if _, ok := dsService.deltaChannel2FlowGraph[channel]; ok {
if fg, ok := dsService.deltaChannel2FlowGraph[channel]; ok {
log.Warn("delta flow graph has been existed",
zap.Any("collectionID", collectionID),
zap.Any("channel", channel),
)
results[channel] = fg
continue
}
newFlowGraph := newQueryNodeDeltaFlowGraph(dsService.ctx,
@ -90,7 +97,10 @@ func (dsService *dataSyncService) addFlowGraphsForDeltaChannels(collectionID Uni
log.Debug("add delta flow graph",
zap.Any("collectionID", collectionID),
zap.Any("channel", channel))
results[channel] = newFlowGraph
}
return results
}
// getFlowGraphByDMLChannel returns the DML flowGraph by channel

View File

@ -869,10 +869,7 @@ func genSimpleReplica() (ReplicaInterface, error) {
}
r := newCollectionReplica(kv)
schema := genSimpleSegCoreSchema()
err = r.addCollection(defaultCollectionID, schema)
if err != nil {
return nil, err
}
r.addCollection(defaultCollectionID, schema)
err = r.addPartition(defaultCollectionID, defaultPartitionID)
return r, err
}

View File

@ -144,8 +144,7 @@ func TestQueryCollection_withoutVChannel(t *testing.T) {
historical := newHistorical(context.Background(), historicalReplica, tsReplica)
//add a segment to historical data
err = historical.replica.addCollection(0, schema)
assert.Nil(t, err)
historical.replica.addCollection(0, schema)
err = historical.replica.addPartition(0, 1)
assert.Nil(t, err)
err = historical.replica.addSegment(2, 1, 0, "testChannel", segmentTypeSealed, true)
@ -168,8 +167,7 @@ func TestQueryCollection_withoutVChannel(t *testing.T) {
//create a streaming
streaming := newStreaming(ctx, streamingReplica, factory, etcdKV, tsReplica)
err = streaming.replica.addCollection(0, schema)
assert.Nil(t, err)
streaming.replica.addCollection(0, schema)
err = streaming.replica.addPartition(0, 1)
assert.Nil(t, err)

View File

@ -147,8 +147,7 @@ func initTestMeta(t *testing.T, node *QueryNode, collectionID UniqueID, segmentI
}
collectionMeta := genTestCollectionMeta(collectionID, isBinary)
var err = node.historical.replica.addCollection(collectionMeta.ID, collectionMeta.Schema)
assert.NoError(t, err)
node.historical.replica.addCollection(collectionMeta.ID, collectionMeta.Schema)
collection, err := node.historical.replica.getCollectionByID(collectionID)
assert.NoError(t, err)

View File

@ -270,8 +270,7 @@ func TestSegmentLoader_invalid(t *testing.T) {
}),
},
}
err = loader.historicalReplica.addCollection(defaultCollectionID, schema)
assert.NoError(t, err)
loader.historicalReplica.addCollection(defaultCollectionID, schema)
req := &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{

View File

@ -262,181 +262,6 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
zap.Strings("pChannels", pChannels),
)
// init replica
if hasCollectionInStreaming := w.node.streaming.replica.hasCollection(collectionID); !hasCollectionInStreaming {
err := w.node.streaming.replica.addCollection(collectionID, w.req.Schema)
if err != nil {
return err
}
}
// init replica
if hasCollectionInHistorical := w.node.historical.replica.hasCollection(collectionID); !hasCollectionInHistorical {
err := w.node.historical.replica.addCollection(collectionID, w.req.Schema)
if err != nil {
return err
}
}
sCol, err := w.node.streaming.replica.getCollectionByID(collectionID)
if err != nil {
return err
}
sCol.addVChannels(vChannels)
sCol.addPChannels(pChannels)
sCol.setLoadType(lType)
hCol, err := w.node.historical.replica.getCollectionByID(collectionID)
if err != nil {
return err
}
hCol.addVChannels(vChannels)
hCol.addPChannels(pChannels)
hCol.setLoadType(lType)
if lType == loadTypePartition {
for _, partitionID := range partitionIDs {
sCol.deleteReleasedPartition(partitionID)
hCol.deleteReleasedPartition(partitionID)
if hasPartitionInStreaming := w.node.streaming.replica.hasPartition(partitionID); !hasPartitionInStreaming {
err := w.node.streaming.replica.addPartition(collectionID, partitionID)
if err != nil {
return err
}
}
if hasPartitionInHistorical := w.node.historical.replica.hasPartition(partitionID); !hasPartitionInHistorical {
err := w.node.historical.replica.addPartition(collectionID, partitionID)
if err != nil {
return err
}
}
}
}
log.Debug("watchDMChannel, init replica done", zap.Int64("collectionID", collectionID))
consumeSubName := funcutil.GenChannelSubName(Params.QueryNodeCfg.MsgChannelSubName, collectionID, Params.QueryNodeCfg.QueryNodeID)
// group channels by to seeking or consuming
toSeekChannels := make([]*internalpb.MsgPosition, 0)
toSubChannels := make([]Channel, 0)
for _, info := range w.req.Infos {
if info.SeekPosition == nil || len(info.SeekPosition.MsgID) == 0 {
toSubChannels = append(toSubChannels, info.ChannelName)
continue
}
info.SeekPosition.MsgGroup = consumeSubName
toSeekChannels = append(toSeekChannels, info.SeekPosition)
}
log.Debug("watchDMChannel, group channels done", zap.Int64("collectionID", collectionID))
// add excluded segments for unFlushed segments,
// unFlushed segments before check point should be filtered out.
unFlushedCheckPointInfos := make([]*datapb.SegmentInfo, 0)
for _, info := range w.req.Infos {
unFlushedCheckPointInfos = append(unFlushedCheckPointInfos, info.UnflushedSegments...)
}
w.node.streaming.replica.addExcludedSegments(collectionID, unFlushedCheckPointInfos)
log.Debug("watchDMChannel, add check points info for unFlushed segments done",
zap.Int64("collectionID", collectionID),
zap.Any("unFlushedCheckPointInfos", unFlushedCheckPointInfos),
)
// add excluded segments for flushed segments,
// flushed segments with later check point than seekPosition should be filtered out.
flushedCheckPointInfos := make([]*datapb.SegmentInfo, 0)
for _, info := range w.req.Infos {
for _, flushedSegment := range info.FlushedSegments {
for _, position := range toSeekChannels {
if flushedSegment.DmlPosition != nil &&
flushedSegment.DmlPosition.ChannelName == position.ChannelName &&
flushedSegment.DmlPosition.Timestamp > position.Timestamp {
flushedCheckPointInfos = append(flushedCheckPointInfos, flushedSegment)
}
}
}
}
w.node.streaming.replica.addExcludedSegments(collectionID, flushedCheckPointInfos)
log.Debug("watchDMChannel, add check points info for flushed segments done",
zap.Int64("collectionID", collectionID),
zap.Any("flushedCheckPointInfos", flushedCheckPointInfos),
)
// add excluded segments for dropped segments,
// dropped segments with later check point than seekPosition should be filtered out.
droppedCheckPointInfos := make([]*datapb.SegmentInfo, 0)
for _, info := range w.req.Infos {
for _, droppedSegment := range info.DroppedSegments {
for _, position := range toSeekChannels {
if droppedSegment != nil &&
droppedSegment.DmlPosition.ChannelName == position.ChannelName &&
droppedSegment.DmlPosition.Timestamp > position.Timestamp {
droppedCheckPointInfos = append(droppedCheckPointInfos, droppedSegment)
}
}
}
}
w.node.streaming.replica.addExcludedSegments(collectionID, droppedCheckPointInfos)
log.Debug("watchDMChannel, add check points info for dropped segments done",
zap.Int64("collectionID", collectionID),
zap.Any("droppedCheckPointInfos", droppedCheckPointInfos),
)
// create tSafe
for _, channel := range vChannels {
w.node.tSafeReplica.addTSafe(channel)
}
// add flow graph
w.node.dataSyncService.addFlowGraphsForDMLChannels(collectionID, vChannels)
log.Debug("Query node add DML flow graphs", zap.Int64("collectionID", collectionID), zap.Any("channels", vChannels))
// add tSafe watcher if queryCollection exists
qc, err := w.node.queryService.getQueryCollection(collectionID)
if err == nil {
for _, channel := range vChannels {
err = qc.addTSafeWatcher(channel)
if err != nil {
// tSafe have been exist, not error
log.Warn(err.Error())
}
}
}
// channels as consumer
for _, channel := range toSubChannels {
fg, err := w.node.dataSyncService.getFlowGraphByDMLChannel(collectionID, channel)
if err != nil {
return errors.New("watchDmChannelsTask failed, error = " + err.Error())
}
// use pChannel to consume
err = fg.consumerFlowGraph(VPChannels[channel], consumeSubName)
if err != nil {
return errors.New("watchDmChannelsTask failed, msgStream consume error :" + err.Error())
}
}
log.Debug("as consumer channels",
zap.Int64("collectionID", collectionID),
zap.Strings("toSubChannels", toSubChannels))
// seek channel
for _, pos := range toSeekChannels {
fg, err := w.node.dataSyncService.getFlowGraphByDMLChannel(collectionID, pos.ChannelName)
if err != nil {
return errors.New("watchDmChannelsTask failed, error = " + err.Error())
}
pos.MsgGroup = consumeSubName
// use pChannel to seek
pChannel, ok := VPChannels[fg.channel]
if pChannel == "" || !ok {
log.Error("watch dm channel task found unmatched channel name", zap.Any("position", pos), zap.String("fg channel", fg.channel), zap.String("pchannel", pChannel))
return errors.New("empty pchannel found")
}
pos.ChannelName = pChannel
err = fg.seekQueryNodeFlowGraph(pos)
if err != nil {
return errors.New("msgStream seek error :" + err.Error())
}
}
log.Debug("Seek all channel done",
zap.Int64("collectionID", collectionID),
zap.Any("toSeekChannels", toSeekChannels))
// load growing segments
unFlushedSegments := make([]*queryPb.SegmentLoadInfo, 0)
unFlushedSegmentIDs := make([]UniqueID, 0)
@ -466,7 +291,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
zap.Int64("collectionID", collectionID),
zap.Int64s("unFlushedSegmentIDs", unFlushedSegmentIDs),
)
err = w.node.loader.loadSegment(req, segmentTypeGrowing)
err := w.node.loader.loadSegment(req, segmentTypeGrowing)
if err != nil {
return err
}
@ -475,15 +300,165 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
zap.Int64s("unFlushedSegmentIDs", unFlushedSegmentIDs),
)
// start flow graphs
for _, channel := range vChannels {
err = w.node.dataSyncService.startFlowGraphByDMLChannel(collectionID, channel)
// remove growing segment if watch dmChannels failed
defer func() {
if err != nil {
return errors.New("watchDmChannelsTask failed, error = " + err.Error())
for _, segmentID := range unFlushedSegmentIDs {
w.node.streaming.replica.removeSegment(segmentID)
}
}
}()
consumeSubName := funcutil.GenChannelSubName(Params.QueryNodeCfg.MsgChannelSubName, collectionID, Params.QueryNodeCfg.QueryNodeID)
// group channels by to seeking or consuming
channel2SeekPosition := make(map[string]*internalpb.MsgPosition)
channel2AsConsumerPosition := make(map[string]*internalpb.MsgPosition)
for _, info := range w.req.Infos {
if info.SeekPosition == nil || len(info.SeekPosition.MsgID) == 0 {
channel2AsConsumerPosition[info.ChannelName] = info.SeekPosition
continue
}
info.SeekPosition.MsgGroup = consumeSubName
channel2SeekPosition[info.ChannelName] = info.SeekPosition
}
log.Debug("watchDMChannel, group channels done", zap.Int64("collectionID", collectionID))
// add excluded segments for unFlushed segments,
// unFlushed segments before check point should be filtered out.
unFlushedCheckPointInfos := make([]*datapb.SegmentInfo, 0)
for _, info := range w.req.Infos {
unFlushedCheckPointInfos = append(unFlushedCheckPointInfos, info.UnflushedSegments...)
}
w.node.streaming.replica.addExcludedSegments(collectionID, unFlushedCheckPointInfos)
log.Debug("watchDMChannel, add check points info for unFlushed segments done",
zap.Int64("collectionID", collectionID),
zap.Any("unFlushedCheckPointInfos", unFlushedCheckPointInfos),
)
// add excluded segments for flushed segments,
// flushed segments with later check point than seekPosition should be filtered out.
flushedCheckPointInfos := make([]*datapb.SegmentInfo, 0)
for _, info := range w.req.Infos {
for _, flushedSegment := range info.FlushedSegments {
for _, position := range channel2SeekPosition {
if flushedSegment.DmlPosition != nil &&
flushedSegment.DmlPosition.ChannelName == position.ChannelName &&
flushedSegment.DmlPosition.Timestamp > position.Timestamp {
flushedCheckPointInfos = append(flushedCheckPointInfos, flushedSegment)
}
}
}
}
w.node.streaming.replica.addExcludedSegments(collectionID, flushedCheckPointInfos)
log.Debug("watchDMChannel, add check points info for flushed segments done",
zap.Int64("collectionID", collectionID),
zap.Any("flushedCheckPointInfos", flushedCheckPointInfos),
)
// add excluded segments for dropped segments,
// dropped segments with later check point than seekPosition should be filtered out.
droppedCheckPointInfos := make([]*datapb.SegmentInfo, 0)
for _, info := range w.req.Infos {
for _, droppedSegment := range info.DroppedSegments {
for _, position := range channel2SeekPosition {
if droppedSegment != nil &&
droppedSegment.DmlPosition.ChannelName == position.ChannelName &&
droppedSegment.DmlPosition.Timestamp > position.Timestamp {
droppedCheckPointInfos = append(droppedCheckPointInfos, droppedSegment)
}
}
}
}
w.node.streaming.replica.addExcludedSegments(collectionID, droppedCheckPointInfos)
log.Debug("watchDMChannel, add check points info for dropped segments done",
zap.Int64("collectionID", collectionID),
zap.Any("droppedCheckPointInfos", droppedCheckPointInfos),
)
// add flow graph
channel2FlowGraph := w.node.dataSyncService.addFlowGraphsForDMLChannels(collectionID, vChannels)
log.Debug("Query node add DML flow graphs", zap.Int64("collectionID", collectionID), zap.Any("channels", vChannels))
// channels as consumer
for _, channel := range vChannels {
fg := channel2FlowGraph[channel]
if _, ok := channel2AsConsumerPosition[channel]; ok {
// use pChannel to consume
err = fg.consumerFlowGraph(VPChannels[channel], consumeSubName)
if err != nil {
log.Error("msgStream as consumer failed for dmChannels", zap.Int64("collectionID", collectionID), zap.String("vChannel", channel))
break
}
}
if pos, ok := channel2SeekPosition[channel]; ok {
pos.MsgGroup = consumeSubName
// use pChannel to seek
pos.ChannelName = VPChannels[channel]
err = fg.seekQueryNodeFlowGraph(pos)
if err != nil {
log.Error("msgStream seek failed for dmChannels", zap.Int64("collectionID", collectionID), zap.String("vChannel", channel))
break
}
}
}
log.Debug("WatchDmChannels done", zap.Strings("ChannelIDs", vChannels))
if err != nil {
log.Warn("watchDMChannel, add flowGraph for dmChannels failed", zap.Int64("collectionID", collectionID), zap.Strings("vChannels", vChannels), zap.Error(err))
for _, fg := range channel2FlowGraph {
fg.flowGraph.Close()
}
w.node.dataSyncService.removeFlowGraphsByDMLChannels(vChannels)
return err
}
log.Debug("watchDMChannel, add flowGraph for dmChannels success", zap.Int64("collectionID", collectionID), zap.Strings("vChannels", vChannels))
// init collection
sCol := w.node.streaming.replica.addCollection(collectionID, w.req.Schema)
hCol := w.node.historical.replica.addCollection(collectionID, w.req.Schema)
sCol.addVChannels(vChannels)
sCol.addPChannels(pChannels)
sCol.setLoadType(lType)
hCol.addVChannels(vChannels)
hCol.addPChannels(pChannels)
hCol.setLoadType(lType)
if lType == loadTypePartition {
for _, partitionID := range partitionIDs {
sCol.deleteReleasedPartition(partitionID)
hCol.deleteReleasedPartition(partitionID)
w.node.streaming.replica.addPartition(collectionID, partitionID)
w.node.historical.replica.addPartition(collectionID, partitionID)
}
}
log.Debug("watchDMChannel, init replica done", zap.Int64("collectionID", collectionID), zap.Strings("vChannels", vChannels))
// create tSafe
for _, channel := range vChannels {
w.node.tSafeReplica.addTSafe(channel)
}
// add tSafe watcher if queryCollection exists
qc, err := w.node.queryService.getQueryCollection(collectionID)
if err == nil {
for _, channel := range vChannels {
err = qc.addTSafeWatcher(channel)
if err != nil {
// tSafe have been exist, not error
log.Warn(err.Error())
}
}
}
// start flow graphs
for _, fg := range channel2FlowGraph {
fg.flowGraph.Start()
}
log.Debug("WatchDmChannels done", zap.Int64("collectionID", collectionID), zap.Strings("vChannels", vChannels))
return nil
}
@ -520,12 +495,14 @@ func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error {
vDeltaChannels := make([]Channel, 0)
pDeltaChannels := make([]Channel, 0)
VPDeltaChannels := make(map[string]string) // map[vChannel]pChannel
vChannel2SeekPosition := make(map[string]*internalpb.MsgPosition)
for _, info := range w.req.Infos {
v := info.ChannelName
p := rootcoord.ToPhysicalChannel(info.ChannelName)
vDeltaChannels = append(vDeltaChannels, v)
pDeltaChannels = append(pDeltaChannels, p)
VPDeltaChannels[v] = p
vChannel2SeekPosition[v] = info.SeekPosition
}
log.Debug("Starting WatchDeltaChannels ...",
zap.Any("collectionID", collectionID),
@ -547,18 +524,6 @@ func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error {
return err
}
// Check if the same deltaChannel has been watched
for _, dstChan := range vDeltaChannels {
for _, srcChan := range hCol.vDeltaChannels {
if dstChan == srcChan {
return nil
}
}
}
hCol.addVDeltaChannels(vDeltaChannels)
hCol.addPDeltaChannels(pDeltaChannels)
if hasCollectionInStreaming := w.node.streaming.replica.hasCollection(collectionID); !hasCollectionInStreaming {
return fmt.Errorf("cannot find collection with collectionID, %d", collectionID)
}
@ -566,25 +531,47 @@ func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error {
if err != nil {
return err
}
channel2FlowGraph := w.node.dataSyncService.addFlowGraphsForDeltaChannels(collectionID, vDeltaChannels)
consumeSubName := funcutil.GenChannelSubName(Params.QueryNodeCfg.MsgChannelSubName, collectionID, Params.QueryNodeCfg.QueryNodeID)
// channels as consumer
for _, channel := range vDeltaChannels {
fg := channel2FlowGraph[channel]
// use pChannel to consume
err = fg.consumerFlowGraphLatest(VPDeltaChannels[channel], consumeSubName)
if err != nil {
log.Error("msgStream as consumer failed for deltaChannels", zap.Int64("collectionID", collectionID), zap.Strings("vDeltaChannels", vDeltaChannels))
break
}
err = w.node.loader.FromDmlCPLoadDelete(w.ctx, collectionID, vChannel2SeekPosition[channel])
if err != nil {
log.Error("watchDeltaChannelsTask from dml cp load delete failed", zap.Int64("collectionID", collectionID), zap.Strings("vDeltaChannels", vDeltaChannels))
break
}
}
if err != nil {
log.Warn("watchDeltaChannel, add flowGraph for deltaChannel failed", zap.Int64("collectionID", collectionID), zap.Strings("vDeltaChannels", vDeltaChannels), zap.Error(err))
for _, fg := range channel2FlowGraph {
fg.flowGraph.Close()
}
w.node.dataSyncService.removeFlowGraphsByDeltaChannels(vDeltaChannels)
return err
}
log.Debug("watchDeltaChannel, add flowGraph for deltaChannel success", zap.Int64("collectionID", collectionID), zap.Strings("vDeltaChannels", vDeltaChannels))
//set collection replica
hCol.addVDeltaChannels(vDeltaChannels)
hCol.addPDeltaChannels(pDeltaChannels)
sCol.addVDeltaChannels(vDeltaChannels)
sCol.addPDeltaChannels(pDeltaChannels)
consumeSubName := funcutil.GenChannelSubName(Params.QueryNodeCfg.MsgChannelSubName, collectionID, Params.QueryNodeCfg.QueryNodeID)
// group channels by to seeking or consuming
toSubChannels := make([]Channel, 0)
for _, info := range w.req.Infos {
toSubChannels = append(toSubChannels, info.ChannelName)
}
log.Debug("watchDeltaChannel, group channels done", zap.Any("collectionID", collectionID))
// create tSafe
for _, channel := range vDeltaChannels {
w.node.tSafeReplica.addTSafe(channel)
}
w.node.dataSyncService.addFlowGraphsForDeltaChannels(collectionID, vDeltaChannels)
// add tSafe watcher if queryCollection exists
qc, err := w.node.queryService.getQueryCollection(collectionID)
if err == nil {
@ -597,37 +584,12 @@ func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error {
}
}
// channels as consumer
for _, channel := range toSubChannels {
fg, err := w.node.dataSyncService.getFlowGraphByDeltaChannel(collectionID, channel)
if err != nil {
return errors.New("watchDeltaChannelsTask failed, error = " + err.Error())
}
// use pChannel to consume
err = fg.consumerFlowGraphLatest(VPDeltaChannels[channel], consumeSubName)
if err != nil {
return errors.New("watchDeltaChannelsTask failed, msgStream consume error :" + err.Error())
}
}
log.Debug("as consumer channels",
zap.Any("collectionID", collectionID),
zap.Any("toSubChannels", toSubChannels))
for _, info := range w.req.Infos {
if err := w.node.loader.FromDmlCPLoadDelete(w.ctx, collectionID, info.SeekPosition); err != nil {
return errors.New("watchDeltaChannelsTask from dml cp load delete failed, error = " + err.Error())
}
}
// start flow graphs
for _, channel := range vDeltaChannels {
err = w.node.dataSyncService.startFlowGraphForDeltaChannel(collectionID, channel)
if err != nil {
return errors.New("watchDeltaChannelsTask failed, error = " + err.Error())
}
for _, fg := range channel2FlowGraph {
fg.flowGraph.Start()
}
log.Debug("WatchDeltaChannels done", zap.String("ChannelIDs", fmt.Sprintln(vDeltaChannels)))
log.Debug("WatchDeltaChannels done", zap.Int64("collectionID", collectionID), zap.String("ChannelIDs", fmt.Sprintln(vDeltaChannels)))
return nil
}
@ -669,10 +631,7 @@ func (l *loadSegmentsTask) Execute(ctx context.Context) error {
hasCollectionInHistorical := l.node.historical.replica.hasCollection(collectionID)
hasPartitionInHistorical := l.node.historical.replica.hasPartition(partitionID)
if !hasCollectionInHistorical {
err = l.node.historical.replica.addCollection(collectionID, l.req.Schema)
if err != nil {
return err
}
l.node.historical.replica.addCollection(collectionID, l.req.Schema)
}
if !hasPartitionInHistorical {
err = l.node.historical.replica.addPartition(collectionID, partitionID)
@ -683,10 +642,7 @@ func (l *loadSegmentsTask) Execute(ctx context.Context) error {
hasCollectionInStreaming := l.node.streaming.replica.hasCollection(collectionID)
hasPartitionInStreaming := l.node.streaming.replica.hasPartition(partitionID)
if !hasCollectionInStreaming {
err = l.node.streaming.replica.addCollection(collectionID, l.req.Schema)
if err != nil {
return err
}
l.node.streaming.replica.addCollection(collectionID, l.req.Schema)
}
if !hasPartitionInStreaming {
err = l.node.streaming.replica.addPartition(collectionID, partitionID)