mirror of https://github.com/milvus-io/milvus.git
parent
98e4ff33a8
commit
148dad23ac
|
@ -250,17 +250,16 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
|
|||
pChannels = append(pChannels, p)
|
||||
VPChannels[v] = p
|
||||
}
|
||||
log.Debug("Starting WatchDmChannels ...",
|
||||
zap.Any("collectionName", w.req.Schema.Name),
|
||||
zap.Any("collectionID", collectionID),
|
||||
zap.Any("vChannels", vChannels),
|
||||
zap.Any("pChannels", pChannels),
|
||||
)
|
||||
|
||||
if len(VPChannels) != len(vChannels) {
|
||||
return errors.New("get physical channels failed, illegal channel length, collectionID = " + fmt.Sprintln(collectionID))
|
||||
}
|
||||
log.Debug("Get physical channels done",
|
||||
zap.Any("collectionID", collectionID),
|
||||
|
||||
log.Debug("Starting WatchDmChannels ...",
|
||||
zap.String("collectionName", w.req.Schema.Name),
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Strings("vChannels", vChannels),
|
||||
zap.Strings("pChannels", pChannels),
|
||||
)
|
||||
|
||||
// init replica
|
||||
|
@ -309,7 +308,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
}
|
||||
log.Debug("watchDMChannel, init replica done", zap.Any("collectionID", collectionID))
|
||||
log.Debug("watchDMChannel, init replica done", zap.Int64("collectionID", collectionID))
|
||||
|
||||
// get subscription name
|
||||
getUniqueSubName := func() string {
|
||||
|
@ -329,7 +328,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
|
|||
info.SeekPosition.MsgGroup = consumeSubName
|
||||
toSeekChannels = append(toSeekChannels, info.SeekPosition)
|
||||
}
|
||||
log.Debug("watchDMChannel, group channels done", zap.Any("collectionID", collectionID))
|
||||
log.Debug("watchDMChannel, group channels done", zap.Int64("collectionID", collectionID))
|
||||
|
||||
// add excluded segments for unFlushed segments,
|
||||
// unFlushed segments before check point should be filtered out.
|
||||
|
@ -339,7 +338,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
|
|||
}
|
||||
w.node.streaming.replica.addExcludedSegments(collectionID, unFlushedCheckPointInfos)
|
||||
log.Debug("watchDMChannel, add check points info for unFlushed segments done",
|
||||
zap.Any("collectionID", collectionID),
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Any("unFlushedCheckPointInfos", unFlushedCheckPointInfos),
|
||||
)
|
||||
|
||||
|
@ -359,7 +358,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
|
|||
}
|
||||
w.node.streaming.replica.addExcludedSegments(collectionID, flushedCheckPointInfos)
|
||||
log.Debug("watchDMChannel, add check points info for flushed segments done",
|
||||
zap.Any("collectionID", collectionID),
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Any("flushedCheckPointInfos", flushedCheckPointInfos),
|
||||
)
|
||||
|
||||
|
@ -379,7 +378,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
|
|||
}
|
||||
w.node.streaming.replica.addExcludedSegments(collectionID, droppedCheckPointInfos)
|
||||
log.Debug("watchDMChannel, add check points info for dropped segments done",
|
||||
zap.Any("collectionID", collectionID),
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Any("droppedCheckPointInfos", droppedCheckPointInfos),
|
||||
)
|
||||
|
||||
|
@ -390,7 +389,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
|
|||
|
||||
// add flow graph
|
||||
w.node.dataSyncService.addFlowGraphsForDMLChannels(collectionID, vChannels)
|
||||
log.Debug("Query node add DML flow graphs", zap.Any("channels", vChannels))
|
||||
log.Debug("Query node add DML flow graphs", zap.Int64("collectionID", collectionID), zap.Any("channels", vChannels))
|
||||
|
||||
// add tSafe watcher if queryCollection exists
|
||||
qc, err := w.node.queryService.getQueryCollection(collectionID)
|
||||
|
@ -417,8 +416,8 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
log.Debug("as consumer channels",
|
||||
zap.Any("collectionID", collectionID),
|
||||
zap.Any("toSubChannels", toSubChannels))
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Strings("toSubChannels", toSubChannels))
|
||||
|
||||
// seek channel
|
||||
for _, pos := range toSeekChannels {
|
||||
|
@ -440,7 +439,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
log.Debug("Seek all channel done",
|
||||
zap.Any("collectionID", collectionID),
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Any("toSeekChannels", toSeekChannels))
|
||||
|
||||
// load growing segments
|
||||
|
@ -469,16 +468,16 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
|
|||
Schema: w.req.Schema,
|
||||
}
|
||||
log.Debug("loading growing segments in WatchDmChannels...",
|
||||
zap.Any("collectionID", collectionID),
|
||||
zap.Any("unFlushedSegmentIDs", unFlushedSegmentIDs),
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Int64s("unFlushedSegmentIDs", unFlushedSegmentIDs),
|
||||
)
|
||||
err = w.node.loader.loadSegment(req, segmentTypeGrowing)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debug("load growing segments done in WatchDmChannels",
|
||||
zap.Any("collectionID", collectionID),
|
||||
zap.Any("unFlushedSegmentIDs", unFlushedSegmentIDs),
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Int64s("unFlushedSegmentIDs", unFlushedSegmentIDs),
|
||||
)
|
||||
|
||||
// start flow graphs
|
||||
|
@ -489,7 +488,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
|
||||
log.Debug("WatchDmChannels done", zap.String("ChannelIDs", fmt.Sprintln(vChannels)))
|
||||
log.Debug("WatchDmChannels done", zap.Strings("ChannelIDs", vChannels))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue