improve query node log (#6897)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/6897/merge
bigsheeper 2021-07-31 10:47:22 +08:00 committed by GitHub
parent ec22185ff4
commit e8ea8b51d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 84 additions and 85 deletions

View File

@ -74,7 +74,7 @@ func HandleCStatus(status *C.CStatus, extraInfo string) error {
finalMsg := fmt.Sprintf("[%s] %s", errorName, errorMsg)
logMsg := fmt.Sprintf("%s, C Runtime Exception: %s\n", extraInfo, finalMsg)
log.Error(logMsg)
log.Warn(logMsg)
return errors.New(finalMsg)
}

View File

@ -410,7 +410,7 @@ func (colReplica *collectionReplica) removeSegmentPrivate(segmentID UniqueID) er
key := fmt.Sprintf("%s/%d", queryNodeSegmentMetaPrefix, segmentID)
err = colReplica.etcdKV.Remove(key)
if err != nil {
log.Error("error when remove segment info from etcd")
log.Warn("error when remove segment info from etcd")
}
return nil

View File

@ -112,14 +112,14 @@ func (ddNode *ddNode) createCollection(msg *msgstream.CreateCollectionMsg) {
var schema schemapb.CollectionSchema
err := proto.Unmarshal(msg.Schema, &schema)
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
return
}
// add collection
err = ddNode.replica.addCollection(collectionID, &schema)
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
return
}
@ -127,7 +127,7 @@ func (ddNode *ddNode) createCollection(msg *msgstream.CreateCollectionMsg) {
// TODO: allocate default partition id in master
err = ddNode.replica.addPartition(collectionID, partitionID)
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
return
}
@ -156,7 +156,7 @@ func (ddNode *ddNode) createPartition(msg *msgstream.CreatePartitionMsg) {
err := ddNode.replica.addPartition(collectionID, partitionID)
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
return
}

View File

@ -128,7 +128,7 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
if fdmNode.loadType == loadTypeCollection {
col, err := fdmNode.replica.getCollectionByID(msg.CollectionID)
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
return nil
}
if err = col.checkReleasedPartitions([]UniqueID{msg.PartitionID}); err != nil {
@ -142,7 +142,7 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
// so we need to compare the endTimestamp of received messages and position's timestamp.
excludedSegments, err := fdmNode.replica.getExcludedSegments(fdmNode.collectionID)
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
return nil
}
for _, segmentInfo := range excludedSegments {
@ -156,7 +156,7 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) {
// TODO: what if the messages are misaligned? Here, we ignore those messages and print error
log.Error("Error, misaligned messages detected")
log.Warn("Error, misaligned messages detected")
return nil
}
@ -184,7 +184,7 @@ func newFilteredDmNode(replica ReplicaInterface,
if loadType != loadTypeCollection && loadType != loadTypePartition {
err := errors.New("invalid flow graph type")
log.Error(err.Error())
log.Warn(err.Error())
}
return &filterDmNode{

View File

@ -79,7 +79,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if hasPartition := iNode.replica.hasPartition(task.PartitionID); !hasPartition {
err := iNode.replica.addPartition(task.CollectionID, task.PartitionID)
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
continue
}
}
@ -88,7 +88,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if !iNode.replica.hasSegment(task.SegmentID) {
err := iNode.replica.addSegment(task.SegmentID, task.PartitionID, task.CollectionID, task.ChannelID, segmentTypeGrowing, true)
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
continue
}
}
@ -102,15 +102,14 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
for segmentID := range insertData.insertRecords {
var targetSegment, err = iNode.replica.getSegmentByID(segmentID)
if err != nil {
log.Error("preInsert failed")
// TODO: add error handling
log.Warn(err.Error())
}
var numOfRecords = len(insertData.insertRecords[segmentID])
if targetSegment != nil {
offset, err := targetSegment.segmentPreInsert(numOfRecords)
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
}
insertData.insertOffset[segmentID] = offset
log.Debug("insertNode operator", zap.Int("insert size", numOfRecords), zap.Int64("insert offset", offset), zap.Int64("segment id", segmentID))
@ -144,7 +143,7 @@ func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *syn
return
}
if err != nil {
log.Error("cannot find segment:", zap.Int64("segmentID", segmentID))
log.Warn("cannot find segment:", zap.Int64("segmentID", segmentID))
// TODO: add error handling
wg.Done()
return

View File

@ -105,7 +105,7 @@ func newQueryNodeFlowGraph(ctx context.Context,
func (q *queryNodeFlowGraph) newDmInputNode(ctx context.Context, factory msgstream.Factory) *flowgraph.InputNode {
insertStream, err := factory.NewTtMsgStream(ctx)
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
} else {
q.dmlStream = insertStream
}

View File

@ -73,7 +73,7 @@ func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
//)
//if err := stNode.sendTimeTick(serviceTimeMsg.timeRange.timestampMax); err != nil {
// log.Error("Error: send time tick into pulsar channel failed", zap.Error(err))
// log.Warn("Error: send time tick into pulsar channel failed", zap.Error(err))
//}
var res Msg = &gcMsg{
@ -121,7 +121,7 @@ func newServiceTimeNode(ctx context.Context,
timeTimeMsgStream, err := factory.NewMsgStream(ctx)
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
} else {
// TODO: use param table
timeTickChannel := "query-node-time-tick-0"

View File

@ -91,7 +91,7 @@ func (h *historical) watchGlobalSegmentMeta() {
for _, event := range resp.Events {
segmentID, err := strconv.ParseInt(filepath.Base(string(event.Kv.Key)), 10, 64)
if err != nil {
log.Error("watchGlobalSegmentMeta failed", zap.Any("error", err.Error()))
log.Warn("watchGlobalSegmentMeta failed", zap.Any("error", err.Error()))
continue
}
switch event.Type {
@ -102,7 +102,7 @@ func (h *historical) watchGlobalSegmentMeta() {
segmentInfo := &querypb.SegmentInfo{}
err = proto.UnmarshalText(string(event.Kv.Value), segmentInfo)
if err != nil {
log.Error("watchGlobalSegmentMeta failed", zap.Any("error", err.Error()))
log.Warn("watchGlobalSegmentMeta failed", zap.Any("error", err.Error()))
continue
}
h.addGlobalSegmentInfo(segmentID, segmentInfo)

View File

@ -211,7 +211,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, in *queryPb.WatchDmC
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
log.Error(err.Error())
log.Warn(err.Error())
return status, err
}
log.Debug("watchDmChannelsTask Enqueue done", zap.Any("collectionID", in.CollectionID))
@ -219,7 +219,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, in *queryPb.WatchDmC
func() {
err = dct.WaitToFinish()
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
return
}
log.Debug("watchDmChannelsTask WaitToFinish done", zap.Any("collectionID", in.CollectionID))
@ -256,7 +256,7 @@ func (node *QueryNode) LoadSegments(ctx context.Context, in *queryPb.LoadSegment
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
log.Error(err.Error())
log.Warn(err.Error())
return status, err
}
segmentIDs := make([]UniqueID, 0)
@ -268,7 +268,7 @@ func (node *QueryNode) LoadSegments(ctx context.Context, in *queryPb.LoadSegment
func() {
err = dct.WaitToFinish()
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
return
}
log.Debug("loadSegmentsTask WaitToFinish done", zap.Int64s("segmentIDs", segmentIDs))
@ -305,7 +305,7 @@ func (node *QueryNode) ReleaseCollection(ctx context.Context, in *queryPb.Releas
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
log.Error(err.Error())
log.Warn(err.Error())
return status, err
}
log.Debug("releaseCollectionTask Enqueue done", zap.Any("collectionID", in.CollectionID))
@ -313,7 +313,7 @@ func (node *QueryNode) ReleaseCollection(ctx context.Context, in *queryPb.Releas
func() {
err = dct.WaitToFinish()
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
return
}
log.Debug("releaseCollectionTask WaitToFinish done", zap.Any("collectionID", in.CollectionID))
@ -350,7 +350,7 @@ func (node *QueryNode) ReleasePartitions(ctx context.Context, in *queryPb.Releas
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
log.Error(err.Error())
log.Warn(err.Error())
return status, err
}
log.Debug("releasePartitionsTask Enqueue done", zap.Any("collectionID", in.CollectionID))
@ -358,7 +358,7 @@ func (node *QueryNode) ReleasePartitions(ctx context.Context, in *queryPb.Releas
func() {
err = dct.WaitToFinish()
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
return
}
log.Debug("releasePartitionsTask WaitToFinish done", zap.Any("collectionID", in.CollectionID))

View File

@ -85,7 +85,7 @@ type indexLoader struct {
// // sendQueryNodeStats
// err := loader.sendQueryNodeStats()
// if err != nil {
// log.Error(err.Error())
// log.Warn(err.Error())
// wg.Done()
// return
// }

View File

@ -119,7 +119,7 @@ func (p *ParamTable) Init() {
func (p *ParamTable) initQueryTimeTickChannelName() {
ch, err := p.Load("msgChannel.chanNamePrefix.queryTimeTick")
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
}
p.QueryTimeTickChannelName = ch
}
@ -240,7 +240,7 @@ func (p *ParamTable) initGracefulTime() {
func (p *ParamTable) initMsgChannelSubName() {
namePrefix, err := p.Load("msgChannel.subNamePrefix.queryNodeSubNamePrefix")
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
}
subName := namePrefix + "-" + strconv.FormatInt(p.QueryNodeID, 10)
p.MsgChannelSubName = subName

View File

@ -128,7 +128,7 @@ func (q *queryCollection) close() {
func (q *queryCollection) register() {
collection, err := q.streaming.replica.getCollectionByID(q.collectionID)
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
return
}
@ -166,7 +166,7 @@ func (q *queryCollection) waitNewTSafe() Timestamp {
// block until any vChannel updating tSafe
_, _, recvOK := reflect.Select(q.watcherSelectCase)
if !recvOK {
//log.Error("tSafe has been closed", zap.Any("collectionID", q.collectionID))
//log.Warn("tSafe has been closed", zap.Any("collectionID", q.collectionID))
return Timestamp(math.MaxInt64)
}
//log.Debug("wait new tSafe", zap.Any("collectionID", s.collectionID))
@ -247,7 +247,7 @@ func (q *queryCollection) loadBalance(msg *msgstream.LoadBalanceSegmentsMsg) {
// if nodeID == info.SourceNodeID {
// err := s.historical.replica.removeSegment(segmentID)
// if err != nil {
// log.Error("loadBalance failed when remove segment",
// log.Warn("loadBalance failed when remove segment",
// zap.Error(err),
// zap.Any("segmentID", segmentID))
// }
@ -255,7 +255,7 @@ func (q *queryCollection) loadBalance(msg *msgstream.LoadBalanceSegmentsMsg) {
// if nodeID == info.DstNodeID {
// segment, err := s.historical.replica.getSegmentByID(segmentID)
// if err != nil {
// log.Error("loadBalance failed when making segment on service",
// log.Warn("loadBalance failed when making segment on service",
// zap.Error(err),
// zap.Any("segmentID", segmentID))
// continue // not return, try to load balance all segment
@ -290,11 +290,11 @@ func (q *queryCollection) receiveQueryMsg(msg queryMsg) {
//)
default:
err := fmt.Errorf("receive invalid msgType = %d", msgType)
log.Error(err.Error())
log.Warn(err.Error())
return
}
if collectionID != q.collectionID {
//log.Error("not target collection query request",
//log.Warn("not target collection query request",
// zap.Any("collectionID", q.collectionID),
// zap.Int64("target collectionID", collectionID),
// zap.Int64("msgID", msg.ID()),
@ -309,10 +309,10 @@ func (q *queryCollection) receiveQueryMsg(msg queryMsg) {
// check if collection has been released
collection, err := q.historical.replica.getCollectionByID(collectionID)
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
err = q.publishFailedQueryResult(msg, err.Error())
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
} else {
log.Debug("do query failed in receiveQueryMsg, publish failed query result",
zap.Int64("collectionID", collectionID),
@ -325,10 +325,10 @@ func (q *queryCollection) receiveQueryMsg(msg queryMsg) {
guaranteeTs := msg.GuaranteeTs()
if guaranteeTs >= collection.getReleaseTime() {
err = fmt.Errorf("retrieve failed, collection has been released, msgID = %d, collectionID = %d", msg.ID(), collectionID)
log.Error(err.Error())
log.Warn(err.Error())
err = q.publishFailedQueryResult(msg, err.Error())
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
} else {
log.Debug("do query failed in receiveQueryMsg, publish failed query result",
zap.Int64("collectionID", collectionID),
@ -375,16 +375,16 @@ func (q *queryCollection) receiveQueryMsg(msg queryMsg) {
err = q.search(msg)
default:
err := fmt.Errorf("receive invalid msgType = %d", msgType)
log.Error(err.Error())
log.Warn(err.Error())
return
}
tr.Record("operation done")
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
err = q.publishFailedQueryResult(msg, err.Error())
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
} else {
log.Debug("do query failed in receiveQueryMsg, publish failed query result",
zap.Int64("collectionID", collectionID),
@ -468,15 +468,15 @@ func (q *queryCollection) doUnsolvedQueryMsg() {
err = q.search(m)
default:
err := fmt.Errorf("receive invalid msgType = %d", msgType)
log.Error(err.Error())
log.Warn(err.Error())
return
}
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
err = q.publishFailedQueryResult(m, err.Error())
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
} else {
log.Debug("do query failed in doUnsolvedMsg, publish failed query result",
zap.Int64("collectionID", q.collectionID),
@ -849,7 +849,7 @@ func (q *queryCollection) search(msg queryMsg) error {
// historical search
hisSearchResults, hisSegmentResults, err1 := q.historical.search(searchRequests, collectionID, searchMsg.PartitionIDs, plan, travelTimestamp)
if err1 != nil {
log.Error(err1.Error())
log.Warn(err1.Error())
return err1
}
searchResults = append(searchResults, hisSearchResults...)
@ -866,7 +866,7 @@ func (q *queryCollection) search(msg queryMsg) error {
var strSegmentResults []*Segment
strSearchResults, strSegmentResults, err2 = q.streaming.search(searchRequests, collectionID, searchMsg.PartitionIDs, channel, plan, travelTimestamp)
if err2 != nil {
log.Error(err2.Error())
log.Warn(err2.Error())
return err2
}
searchResults = append(searchResults, strSearchResults...)

View File

@ -117,7 +117,7 @@ func (q *queryService) hasQueryCollection(collectionID UniqueID) bool {
func (q *queryService) stopQueryCollection(collectionID UniqueID) {
sc, ok := q.queryCollections[collectionID]
if !ok {
log.Error("stopQueryCollection failed, collection doesn't exist", zap.Int64("collectionID", collectionID))
log.Warn("stopQueryCollection failed, collection doesn't exist", zap.Int64("collectionID", collectionID))
return
}
sc.close()

View File

@ -67,7 +67,7 @@ func fillTargetEntry(plan *SearchPlan, searchResults []*SearchResult, matchedSeg
go func(i int) {
err := matchedSegments[i].fillTargetEntry(plan, searchResults[i])
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
}
wg.Done()
}(i)

View File

@ -170,14 +170,14 @@ func newSegment(collection *Collection, segmentID int64, partitionID UniqueID, c
var segmentPtr C.CSegmentInterface
switch segType {
case segmentTypeInvalid:
log.Error("illegal segment type when create segment")
log.Warn("illegal segment type when create segment")
return nil
case segmentTypeSealed:
segmentPtr = C.NewSegment(collection.collectionPtr, C.ulong(segmentID), C.Sealed)
case segmentTypeGrowing:
segmentPtr = C.NewSegment(collection.collectionPtr, C.ulong(segmentID), C.Growing)
default:
log.Error("illegal segment type when create segment")
log.Warn("illegal segment type when create segment")
return nil
}

View File

@ -101,13 +101,13 @@ func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest, onSer
err = loader.loadSegmentInternal(collectionID, segment, info)
if err != nil {
deleteSegment(segment)
log.Error(err.Error())
log.Warn(err.Error())
continue
}
err = loader.historicalReplica.setSegment(segment)
if err != nil {
deleteSegment(segment)
log.Error(err.Error())
log.Warn(err.Error())
continue
}
if onService {
@ -115,14 +115,14 @@ func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest, onSer
value, err := loader.etcdKV.Load(key)
if err != nil {
deleteSegment(segment)
log.Error("error when load segment info from etcd", zap.Any("error", err.Error()))
log.Warn("error when load segment info from etcd", zap.Any("error", err.Error()))
continue
}
segmentInfo := &querypb.SegmentInfo{}
err = proto.UnmarshalText(value, segmentInfo)
if err != nil {
deleteSegment(segment)
log.Error("error when unmarshal segment info from etcd", zap.Any("error", err.Error()))
log.Warn("error when unmarshal segment info from etcd", zap.Any("error", err.Error()))
continue
}
segmentInfo.SegmentState = querypb.SegmentState_sealed
@ -130,7 +130,7 @@ func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest, onSer
err = loader.etcdKV.Save(newKey, proto.MarshalTextString(segmentInfo))
if err != nil {
deleteSegment(segment)
log.Error("error when update segment info to etcd", zap.Any("error", err.Error()))
log.Warn("error when update segment info to etcd", zap.Any("error", err.Error()))
}
}
}
@ -220,7 +220,7 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, fieldBinlog
defer func() {
err := iCodec.Close()
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
}
}()
blobs := make([]*storage.Blob, 0)
@ -247,7 +247,7 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, fieldBinlog
_, _, insertData, err := iCodec.Deserialize(blobs)
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
return err
}

View File

@ -143,13 +143,13 @@ func (s *streaming) search(searchReqs []*searchRequest,
zap.Any("segmentIDs", segIDs),
)
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
return searchResults, segmentResults, err
}
for _, segID := range segIDs {
seg, err := s.replica.getSegmentByID(segID)
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
return searchResults, segmentResults, err
}

View File

@ -90,7 +90,7 @@ func (b *baseTask) Notify(err error) {
// watchDmChannelsTask
func (w *watchDmChannelsTask) Timestamp() Timestamp {
if w.req.Base == nil {
log.Error("nil base req in watchDmChannelsTask", zap.Any("collectionID", w.req.CollectionID))
log.Warn("nil base req in watchDmChannelsTask", zap.Any("collectionID", w.req.CollectionID))
return 0
}
return w.req.Base.Timestamp
@ -217,7 +217,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
}
err = w.node.streaming.replica.addExcludedSegments(collectionID, checkPointInfos)
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
return err
}
log.Debug("watchDMChannel, add check points info done", zap.Any("collectionID", collectionID))
@ -262,7 +262,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
err := fg.consumerFlowGraph(VPChannels[channel], consumeSubName)
if err != nil {
errMsg := "msgStream consume error :" + err.Error()
log.Error(errMsg)
log.Warn(errMsg)
return errors.New(errMsg)
}
}
@ -282,7 +282,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
err := fg.seekQueryNodeFlowGraph(pos)
if err != nil {
errMsg := "msgStream seek error :" + err.Error()
log.Error(errMsg)
log.Warn(errMsg)
return errors.New(errMsg)
}
}
@ -316,7 +316,7 @@ func (w *watchDmChannelsTask) PostExecute(ctx context.Context) error {
// loadSegmentsTask
func (l *loadSegmentsTask) Timestamp() Timestamp {
if l.req.Base == nil {
log.Error("nil base req in loadSegmentsTask")
log.Warn("nil base req in loadSegmentsTask")
return 0
}
return l.req.Base.Timestamp
@ -352,7 +352,7 @@ func (l *loadSegmentsTask) Execute(ctx context.Context) error {
}
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
return err
}
@ -382,7 +382,7 @@ func (l *loadSegmentsTask) PostExecute(ctx context.Context) error {
// releaseCollectionTask
func (r *releaseCollectionTask) Timestamp() Timestamp {
if r.req.Base == nil {
log.Error("nil base req in releaseCollectionTask", zap.Any("collectionID", r.req.CollectionID))
log.Warn("nil base req in releaseCollectionTask", zap.Any("collectionID", r.req.CollectionID))
return 0
}
return r.req.Base.Timestamp
@ -405,7 +405,7 @@ func (r *releaseCollectionTask) Execute(ctx context.Context) error {
log.Debug("receive release collection task", zap.Any("collectionID", r.req.CollectionID))
collection, err := r.node.streaming.replica.getCollectionByID(r.req.CollectionID)
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
return err
}
collection.setReleaseTime(r.req.Base.Timestamp)
@ -435,7 +435,7 @@ func (r *releaseCollectionTask) Execute(ctx context.Context) error {
if hasCollectionInHistorical {
err := r.node.historical.replica.removeCollection(r.req.CollectionID)
if err != nil {
log.Error(errMsg + err.Error())
log.Warn(errMsg + err.Error())
return
}
}
@ -444,7 +444,7 @@ func (r *releaseCollectionTask) Execute(ctx context.Context) error {
if hasCollectionInStreaming {
err := r.node.streaming.replica.removeCollection(r.req.CollectionID)
if err != nil {
log.Error(errMsg + err.Error())
log.Warn(errMsg + err.Error())
return
}
}
@ -465,7 +465,7 @@ func (r *releaseCollectionTask) PostExecute(ctx context.Context) error {
// releasePartitionsTask
func (r *releasePartitionsTask) Timestamp() Timestamp {
if r.req.Base == nil {
log.Error("nil base req in releasePartitionsTask", zap.Any("collectionID", r.req.CollectionID))
log.Warn("nil base req in releasePartitionsTask", zap.Any("collectionID", r.req.CollectionID))
return 0
}
return r.req.Base.Timestamp
@ -496,13 +496,13 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error {
hCol, err := r.node.historical.replica.getCollectionByID(r.req.CollectionID)
if err != nil {
log.Error(errMsg + err.Error())
log.Warn(errMsg + err.Error())
return
}
sCol, err := r.node.streaming.replica.getCollectionByID(r.req.CollectionID)
if err != nil {
log.Error(errMsg + err.Error())
log.Warn(errMsg + err.Error())
return
}
@ -526,7 +526,7 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error {
err = r.node.historical.replica.removePartition(id)
if err != nil {
// not return, try to release all partitions
log.Error(errMsg + err.Error())
log.Warn(errMsg + err.Error())
}
}
hCol.addReleasedPartition(id)
@ -535,7 +535,7 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error {
if hasPartitionInStreaming {
err = r.node.streaming.replica.removePartition(id)
if err != nil {
log.Error(errMsg + err.Error())
log.Warn(errMsg + err.Error())
}
}
sCol.addReleasedPartition(id)

View File

@ -44,7 +44,7 @@ func (s *taskScheduler) processTask(t task, q taskQueue) {
t.Notify(err)
}()
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
return
}
@ -55,7 +55,7 @@ func (s *taskScheduler) processTask(t task, q taskQueue) {
err = t.Execute(s.ctx)
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
return
}
err = t.PostExecute(s.ctx)

View File

@ -40,7 +40,7 @@ func (t *tSafeReplica) getTSafe(vChannel Channel) Timestamp {
defer t.mu.Unlock()
safer, err := t.getTSaferPrivate(vChannel)
if err != nil {
log.Error("get tSafe failed", zap.Error(err))
log.Warn("get tSafe failed", zap.Error(err))
return 0
}
return safer.get()
@ -51,7 +51,7 @@ func (t *tSafeReplica) setTSafe(vChannel Channel, id UniqueID, timestamp Timesta
defer t.mu.Unlock()
safer, err := t.getTSaferPrivate(vChannel)
if err != nil {
log.Error("set tSafe failed", zap.Error(err))
log.Warn("set tSafe failed", zap.Error(err))
return
}
safer.set(id, timestamp)
@ -60,7 +60,7 @@ func (t *tSafeReplica) setTSafe(vChannel Channel, id UniqueID, timestamp Timesta
func (t *tSafeReplica) getTSaferPrivate(vChannel Channel) (tSafer, error) {
if _, ok := t.tSafes[vChannel]; !ok {
err := errors.New("cannot found tSafer, vChannel = " + vChannel)
//log.Error(err.Error())
//log.Warn(err.Error())
return nil, err
}
return t.tSafes[vChannel], nil
@ -75,7 +75,7 @@ func (t *tSafeReplica) addTSafe(vChannel Channel) {
t.tSafes[vChannel].start()
log.Debug("add tSafe done", zap.Any("channel", vChannel))
} else {
log.Error("tSafe has been existed", zap.Any("channel", vChannel))
log.Warn("tSafe has been existed", zap.Any("channel", vChannel))
}
}
@ -98,7 +98,7 @@ func (t *tSafeReplica) registerTSafeWatcher(vChannel Channel, watcher *tSafeWatc
defer t.mu.Unlock()
safer, err := t.getTSaferPrivate(vChannel)
if err != nil {
log.Error("register tSafe watcher failed", zap.Error(err))
log.Warn("register tSafe watcher failed", zap.Error(err))
return
}
safer.registerTSafeWatcher(watcher)