mirror of https://github.com/milvus-io/milvus.git
parent
6db06ecf81
commit
70eb275ef4
|
@ -126,7 +126,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
|
|||
pChannels = append(pChannels, p)
|
||||
VPChannels[v] = p
|
||||
}
|
||||
log.Debug("starting WatchDmChannels ...",
|
||||
log.Debug("Starting WatchDmChannels ...",
|
||||
zap.Any("collectionName", w.req.Schema.Name),
|
||||
zap.Any("collectionID", collectionID),
|
||||
zap.Any("vChannels", vChannels),
|
||||
|
@ -135,7 +135,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
|
|||
if len(VPChannels) != len(vChannels) {
|
||||
return errors.New("get physical channels failed, illegal channel length, collectionID = " + fmt.Sprintln(collectionID))
|
||||
}
|
||||
log.Debug("get physical channels done",
|
||||
log.Debug("Get physical channels done",
|
||||
zap.Any("collectionID", collectionID),
|
||||
)
|
||||
|
||||
|
@ -226,10 +226,10 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
|
|||
// add flow graph
|
||||
if loadPartition {
|
||||
w.node.streaming.dataSyncService.addPartitionFlowGraph(collectionID, partitionID, vChannels)
|
||||
log.Debug("query node add partition flow graphs", zap.Any("channels", vChannels))
|
||||
log.Debug("Query node add partition flow graphs", zap.Any("channels", vChannels))
|
||||
} else {
|
||||
w.node.streaming.dataSyncService.addCollectionFlowGraph(collectionID, vChannels)
|
||||
log.Debug("query node add collection flow graphs", zap.Any("channels", vChannels))
|
||||
log.Debug("Query node add collection flow graphs", zap.Any("channels", vChannels))
|
||||
}
|
||||
|
||||
// add tSafe watcher if queryCollection exists
|
||||
|
@ -290,7 +290,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
}
|
||||
log.Debug("seek all channel done",
|
||||
log.Debug("Seek all channel done",
|
||||
zap.Any("collectionID", collectionID),
|
||||
zap.Any("toSeekChannels", toSeekChannels))
|
||||
|
||||
|
@ -339,7 +339,7 @@ func (l *loadSegmentsTask) PreExecute(ctx context.Context) error {
|
|||
|
||||
func (l *loadSegmentsTask) Execute(ctx context.Context) error {
|
||||
// TODO: support db
|
||||
log.Debug("query node load segment", zap.String("loadSegmentRequest", fmt.Sprintln(l.req)))
|
||||
log.Debug("Query node load segment", zap.String("loadSegmentRequest", fmt.Sprintln(l.req)))
|
||||
var err error
|
||||
|
||||
// init meta
|
||||
|
@ -444,7 +444,7 @@ func (r *releaseCollectionTask) PreExecute(ctx context.Context) error {
|
|||
}
|
||||
|
||||
func (r *releaseCollectionTask) Execute(ctx context.Context) error {
|
||||
log.Debug("receive release collection task", zap.Any("collectionID", r.req.CollectionID))
|
||||
log.Debug("Execute release collection task", zap.Any("collectionID", r.req.CollectionID))
|
||||
errMsg := "release collection failed, collectionID = " + strconv.FormatInt(r.req.CollectionID, 10) + ", err = "
|
||||
collection, err := r.node.streaming.replica.getCollectionByID(r.req.CollectionID)
|
||||
if err != nil {
|
||||
|
@ -458,7 +458,7 @@ func (r *releaseCollectionTask) Execute(ctx context.Context) error {
|
|||
// sleep to wait for query tasks done
|
||||
const gracefulReleaseTime = 1
|
||||
time.Sleep(gracefulReleaseTime * time.Second)
|
||||
log.Debug("starting release collection...",
|
||||
log.Debug("Starting release collection...",
|
||||
zap.Any("collectionID", r.req.CollectionID),
|
||||
)
|
||||
|
||||
|
@ -477,7 +477,7 @@ func (r *releaseCollectionTask) Execute(ctx context.Context) error {
|
|||
|
||||
// remove all tSafes of the target collection
|
||||
for _, channel := range collection.getVChannels() {
|
||||
log.Debug("releasing tSafe in releaseCollectionTask...",
|
||||
log.Debug("Releasing tSafe in releaseCollectionTask...",
|
||||
zap.Any("collectionID", r.req.CollectionID),
|
||||
zap.Any("vChannel", channel),
|
||||
)
|
||||
|
@ -546,7 +546,7 @@ func (r *releasePartitionsTask) PreExecute(ctx context.Context) error {
|
|||
}
|
||||
|
||||
func (r *releasePartitionsTask) Execute(ctx context.Context) error {
|
||||
log.Debug("receive release partition task",
|
||||
log.Debug("Execute release partition task",
|
||||
zap.Any("collectionID", r.req.CollectionID),
|
||||
zap.Any("partitionIDs", r.req.PartitionIDs))
|
||||
errMsg := "release partitions failed, collectionID = " + strconv.FormatInt(r.req.CollectionID, 10) + ", err = "
|
||||
|
@ -574,7 +574,7 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error {
|
|||
r.node.streaming.dataSyncService.removePartitionFlowGraph(id)
|
||||
// remove all tSafes of the target partition
|
||||
for _, channel := range vChannels {
|
||||
log.Debug("releasing tSafe in releasePartitionTask...",
|
||||
log.Debug("Releasing tSafe in releasePartitionTask...",
|
||||
zap.Any("collectionID", r.req.CollectionID),
|
||||
zap.Any("partitionID", id),
|
||||
zap.Any("vChannel", channel),
|
||||
|
@ -613,7 +613,7 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error {
|
|||
// release global segment info
|
||||
r.node.historical.removeGlobalSegmentIDsByPartitionIds(r.req.PartitionIDs)
|
||||
|
||||
log.Debug("release partition task done",
|
||||
log.Debug("Release partition task done",
|
||||
zap.Any("collectionID", r.req.CollectionID),
|
||||
zap.Any("partitionIDs", r.req.PartitionIDs))
|
||||
return nil
|
||||
|
|
Loading…
Reference in New Issue