mirror of https://github.com/milvus-io/milvus.git
Refine Data Log (#19996)
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com> Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>pull/20080/head
parent
3d316fc052
commit
192151bc11
|
@ -82,7 +82,7 @@ func (c *channelStateTimer) loadAllChannels(nodeID UniqueID) ([]*datapb.ChannelW
|
|||
// startOne can write ToWatch or ToRelease states.
|
||||
func (c *channelStateTimer) startOne(watchState datapb.ChannelWatchState, channelName string, nodeID UniqueID, timeoutTs int64) {
|
||||
if timeoutTs == 0 {
|
||||
log.Debug("zero timeoutTs, skip starting timer",
|
||||
log.Info("zero timeoutTs, skip starting timer",
|
||||
zap.String("watch state", watchState.String()),
|
||||
zap.Int64("nodeID", nodeID),
|
||||
zap.String("channel name", channelName),
|
||||
|
@ -94,14 +94,14 @@ func (c *channelStateTimer) startOne(watchState datapb.ChannelWatchState, channe
|
|||
c.runningTimers.Store(channelName, stop)
|
||||
timeoutT := time.Unix(0, timeoutTs)
|
||||
go func() {
|
||||
log.Debug("timer started",
|
||||
log.Info("timer started",
|
||||
zap.String("watch state", watchState.String()),
|
||||
zap.Int64("nodeID", nodeID),
|
||||
zap.String("channel name", channelName),
|
||||
zap.Time("timeout time", timeoutT))
|
||||
select {
|
||||
case <-time.NewTimer(time.Until(timeoutT)).C:
|
||||
log.Debug("timeout and stop timer: wait for channel ACK timeout",
|
||||
log.Info("timeout and stop timer: wait for channel ACK timeout",
|
||||
zap.String("watch state", watchState.String()),
|
||||
zap.Int64("nodeID", nodeID),
|
||||
zap.String("channel name", channelName),
|
||||
|
@ -109,7 +109,7 @@ func (c *channelStateTimer) startOne(watchState datapb.ChannelWatchState, channe
|
|||
ackType := getAckType(watchState)
|
||||
c.notifyTimeoutWatcher(&ackEvent{ackType, channelName, nodeID})
|
||||
case <-stop:
|
||||
log.Debug("stop timer before timeout",
|
||||
log.Info("stop timer before timeout",
|
||||
zap.String("watch state", watchState.String()),
|
||||
zap.Int64("nodeID", nodeID),
|
||||
zap.String("channel name", channelName),
|
||||
|
|
|
@ -162,7 +162,7 @@ func (c *ChannelManager) Startup(ctx context.Context, nodes []int64) error {
|
|||
ctx1, cancel := context.WithCancel(ctx)
|
||||
c.stopChecker = cancel
|
||||
go c.stateChecker(ctx1)
|
||||
log.Debug("starting etcd states checker")
|
||||
log.Info("starting etcd states checker")
|
||||
}
|
||||
|
||||
log.Info("cluster start up",
|
||||
|
@ -196,7 +196,7 @@ func (c *ChannelManager) checkOldNodes(nodes []UniqueID) error {
|
|||
for _, info := range watchInfos {
|
||||
channelName := info.GetVchan().GetChannelName()
|
||||
|
||||
log.Debug("processing watch info",
|
||||
log.Info("processing watch info",
|
||||
zap.String("watch state", info.GetState().String()),
|
||||
zap.String("channel name", channelName))
|
||||
|
||||
|
@ -387,7 +387,7 @@ func (c *ChannelManager) DeleteNode(nodeID int64) error {
|
|||
for _, ch := range channels {
|
||||
chNames = append(chNames, ch.Name)
|
||||
}
|
||||
log.Debug("remove timers for channel of the deregistered node",
|
||||
log.Info("remove timers for channel of the deregistered node",
|
||||
zap.Any("channels", chNames), zap.Int64("nodeID", nodeID))
|
||||
c.stateTimer.removeTimers(chNames)
|
||||
|
||||
|
@ -557,7 +557,7 @@ func (c *ChannelManager) RemoveChannel(channelName string) error {
|
|||
func (c *ChannelManager) remove(nodeID int64, ch *channel) error {
|
||||
var op ChannelOpSet
|
||||
op.Delete(nodeID, []*channel{ch})
|
||||
log.Debug("remove channel assignment",
|
||||
log.Info("remove channel assignment",
|
||||
zap.Int64("nodeID to be removed", nodeID),
|
||||
zap.String("channelID", ch.Name),
|
||||
zap.Int64("collectionID", ch.CollectionID))
|
||||
|
@ -663,7 +663,7 @@ func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context) {
|
|||
log.Info("watch etcd loop quit")
|
||||
return
|
||||
case ackEvent := <-timeoutWatcher:
|
||||
log.Debug("receive timeout acks from state watcher",
|
||||
log.Info("receive timeout acks from state watcher",
|
||||
zap.Any("state", ackEvent.ackType),
|
||||
zap.Int64("nodeID", ackEvent.nodeID), zap.String("channel name", ackEvent.channelName))
|
||||
c.processAck(ackEvent)
|
||||
|
@ -730,7 +730,7 @@ func (c *ChannelManager) Release(nodeID UniqueID, channelName string) error {
|
|||
toReleaseUpdates := getReleaseOp(nodeID, toReleaseChannel)
|
||||
err := c.updateWithTimer(toReleaseUpdates, datapb.ChannelWatchState_ToRelease)
|
||||
if err != nil {
|
||||
log.Debug("fail to update to release with timer", zap.Array("to release updates", toReleaseUpdates))
|
||||
log.Warn("fail to update to release with timer", zap.Array("to release updates", toReleaseUpdates))
|
||||
}
|
||||
|
||||
return err
|
||||
|
@ -802,7 +802,7 @@ func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string)
|
|||
return fmt.Errorf("failed to remove watch info: %v,%s", chToCleanUp, err.Error())
|
||||
}
|
||||
|
||||
log.Debug("try to cleanup removal flag ", zap.String("channel name", channelName))
|
||||
log.Info("try to cleanup removal flag ", zap.String("channel name", channelName))
|
||||
c.h.FinishDropChannel(channelName)
|
||||
|
||||
log.Info("removed channel assignment", zap.Any("channel", chToCleanUp))
|
||||
|
|
|
@ -174,7 +174,7 @@ func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, pla
|
|||
c.executingTaskNum++
|
||||
|
||||
go func() {
|
||||
log.Debug("acquire queue", zap.Int64("nodeID", nodeID), zap.Int64("planID", plan.GetPlanID()))
|
||||
log.Info("acquire queue", zap.Int64("nodeID", nodeID), zap.Int64("planID", plan.GetPlanID()))
|
||||
c.acquireQueue(nodeID)
|
||||
|
||||
ts, err := c.allocator.allocTimestamp(context.TODO())
|
||||
|
@ -200,7 +200,7 @@ func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, pla
|
|||
return
|
||||
}
|
||||
|
||||
log.Debug("start compaction", zap.Int64("nodeID", nodeID), zap.Int64("planID", plan.GetPlanID()))
|
||||
log.Info("start compaction", zap.Int64("nodeID", nodeID), zap.Int64("planID", plan.GetPlanID()))
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
@ -254,7 +254,7 @@ func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.Compact
|
|||
modInfos[i] = modSegments[i].SegmentInfo
|
||||
}
|
||||
|
||||
log.Debug("handleCompactionResult: altering metastore after compaction")
|
||||
log.Info("handleCompactionResult: altering metastore after compaction")
|
||||
if newSegment.GetNumOfRows() > 0 {
|
||||
if err := c.meta.alterMetaStoreAfterCompaction(modInfos, newSegment.SegmentInfo); err != nil {
|
||||
log.Warn("handleCompactionResult: fail to alter metastore after compaction", zap.Error(err))
|
||||
|
@ -273,7 +273,7 @@ func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.Compact
|
|||
StatsLogs: newSegment.GetStatslogs(),
|
||||
}
|
||||
|
||||
log.Debug("handleCompactionResult: syncing segments with node", zap.Int64("nodeID", nodeID))
|
||||
log.Info("handleCompactionResult: syncing segments with node", zap.Int64("nodeID", nodeID))
|
||||
if err := c.sessions.SyncSegments(nodeID, req); err != nil {
|
||||
log.Warn("handleCompactionResult: fail to sync segments with node, reverting metastore",
|
||||
zap.Int64("nodeID", nodeID), zap.String("reason", err.Error()))
|
||||
|
@ -363,7 +363,7 @@ func (c *compactionPlanHandler) acquireQueue(nodeID int64) {
|
|||
}
|
||||
|
||||
func (c *compactionPlanHandler) releaseQueue(nodeID int64) {
|
||||
log.Debug("try to release queue", zap.Int64("nodeID", nodeID))
|
||||
log.Info("try to release queue", zap.Int64("nodeID", nodeID))
|
||||
ch, ok := c.parallelCh[nodeID]
|
||||
if !ok {
|
||||
return
|
||||
|
|
|
@ -101,13 +101,13 @@ func (m *meta) reloadFromKV() error {
|
|||
// AddCollection adds a collection into meta
|
||||
// Note that collection info is just for caching and will not be set into etcd from datacoord
|
||||
func (m *meta) AddCollection(collection *collectionInfo) {
|
||||
log.Debug("meta update: add collection",
|
||||
log.Info("meta update: add collection",
|
||||
zap.Int64("collection ID", collection.ID))
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
m.collections[collection.ID] = collection
|
||||
metrics.DataCoordNumCollections.WithLabelValues().Set(float64(len(m.collections)))
|
||||
log.Debug("meta update: add collection - complete",
|
||||
log.Info("meta update: add collection - complete",
|
||||
zap.Int64("collection ID", collection.ID))
|
||||
}
|
||||
|
||||
|
@ -211,7 +211,7 @@ func (m *meta) GetTotalBinlogSize() int64 {
|
|||
|
||||
// AddSegment records segment info, persisting info into kv store
|
||||
func (m *meta) AddSegment(segment *SegmentInfo) error {
|
||||
log.Debug("meta update: adding segment",
|
||||
log.Info("meta update: adding segment",
|
||||
zap.Int64("segment ID", segment.GetID()))
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
@ -223,14 +223,14 @@ func (m *meta) AddSegment(segment *SegmentInfo) error {
|
|||
}
|
||||
m.segments.SetSegment(segment.GetID(), segment)
|
||||
metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String()).Inc()
|
||||
log.Debug("meta update: adding segment - complete",
|
||||
log.Info("meta update: adding segment - complete",
|
||||
zap.Int64("segment ID", segment.GetID()))
|
||||
return nil
|
||||
}
|
||||
|
||||
// DropSegment remove segment with provided id, etcd persistence also removed
|
||||
func (m *meta) DropSegment(segmentID UniqueID) error {
|
||||
log.Debug("meta update: dropping segment",
|
||||
log.Info("meta update: dropping segment",
|
||||
zap.Int64("segment ID", segmentID))
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
@ -248,7 +248,7 @@ func (m *meta) DropSegment(segmentID UniqueID) error {
|
|||
}
|
||||
metrics.DataCoordNumSegments.WithLabelValues(metrics.DropedSegmentLabel).Inc()
|
||||
m.segments.DropSegment(segmentID)
|
||||
log.Debug("meta update: dropping segment - complete",
|
||||
log.Info("meta update: dropping segment - complete",
|
||||
zap.Int64("segment ID", segmentID))
|
||||
return nil
|
||||
}
|
||||
|
@ -283,7 +283,7 @@ func (m *meta) GetAllSegmentsUnsafe() []*SegmentInfo {
|
|||
|
||||
// SetState setting segment with provided ID state
|
||||
func (m *meta) SetState(segmentID UniqueID, targetState commonpb.SegmentState) error {
|
||||
log.Debug("meta update: setting segment state",
|
||||
log.Info("meta update: setting segment state",
|
||||
zap.Int64("segment ID", segmentID),
|
||||
zap.Any("target state", targetState))
|
||||
m.Lock()
|
||||
|
@ -319,7 +319,7 @@ func (m *meta) SetState(segmentID UniqueID, targetState commonpb.SegmentState) e
|
|||
}
|
||||
// Update in-memory meta.
|
||||
m.segments.SetState(segmentID, targetState)
|
||||
log.Debug("meta update: setting segment state - complete",
|
||||
log.Info("meta update: setting segment state - complete",
|
||||
zap.Int64("segment ID", segmentID),
|
||||
zap.String("target state", targetState.String()))
|
||||
return nil
|
||||
|
@ -327,7 +327,7 @@ func (m *meta) SetState(segmentID UniqueID, targetState commonpb.SegmentState) e
|
|||
|
||||
// UnsetIsImporting removes the `isImporting` flag of a segment.
|
||||
func (m *meta) UnsetIsImporting(segmentID UniqueID) error {
|
||||
log.Debug("meta update: unsetting isImport state of segment",
|
||||
log.Info("meta update: unsetting isImport state of segment",
|
||||
zap.Int64("segment ID", segmentID))
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
@ -348,7 +348,7 @@ func (m *meta) UnsetIsImporting(segmentID UniqueID) error {
|
|||
}
|
||||
// Update in-memory meta.
|
||||
m.segments.SetIsImporting(segmentID, false)
|
||||
log.Debug("meta update: unsetting isImport state of segment - complete",
|
||||
log.Info("meta update: unsetting isImport state of segment - complete",
|
||||
zap.Int64("segment ID", segmentID))
|
||||
return nil
|
||||
}
|
||||
|
@ -365,7 +365,7 @@ func (m *meta) UpdateFlushSegmentsInfo(
|
|||
checkpoints []*datapb.CheckPoint,
|
||||
startPositions []*datapb.SegmentStartPosition,
|
||||
) error {
|
||||
log.Debug("meta update: update flush segments info",
|
||||
log.Info("meta update: update flush segments info",
|
||||
zap.Int64("segmentId", segmentID),
|
||||
zap.Int("binlog", len(binlogs)),
|
||||
zap.Int("stats log", len(statslogs)),
|
||||
|
@ -497,7 +497,7 @@ func (m *meta) UpdateFlushSegmentsInfo(
|
|||
for id, s := range modSegments {
|
||||
m.segments.SetSegment(id, s)
|
||||
}
|
||||
log.Debug("meta update: update flush segments info - update flush segments info successfully",
|
||||
log.Info("meta update: update flush segments info - update flush segments info successfully",
|
||||
zap.Int64("segment ID", segmentID))
|
||||
return nil
|
||||
}
|
||||
|
@ -505,7 +505,7 @@ func (m *meta) UpdateFlushSegmentsInfo(
|
|||
// UpdateDropChannelSegmentInfo updates segment checkpoints and binlogs before drop
|
||||
// reusing segment info to pass segment id, binlogs, statslog, deltalog, start position and checkpoint
|
||||
func (m *meta) UpdateDropChannelSegmentInfo(channel string, segments []*SegmentInfo) error {
|
||||
log.Debug("meta update: update drop channel segment info",
|
||||
log.Info("meta update: update drop channel segment info",
|
||||
zap.String("channel", channel))
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
@ -548,7 +548,7 @@ func (m *meta) UpdateDropChannelSegmentInfo(channel string, segments []*SegmentI
|
|||
zap.String("channel", channel),
|
||||
zap.Error(err))
|
||||
} else {
|
||||
log.Debug("meta update: update drop channel segment info - complete",
|
||||
log.Info("meta update: update drop channel segment info - complete",
|
||||
zap.String("channel", channel))
|
||||
}
|
||||
return err
|
||||
|
@ -794,7 +794,7 @@ func (m *meta) SelectSegments(selector SegmentInfoSelector) []*SegmentInfo {
|
|||
|
||||
// AddAllocation add allocation in segment
|
||||
func (m *meta) AddAllocation(segmentID UniqueID, allocation *Allocation) error {
|
||||
log.Debug("meta update: add allocation",
|
||||
log.Info("meta update: add allocation",
|
||||
zap.Int64("segmentID", segmentID),
|
||||
zap.Any("allocation", allocation))
|
||||
m.Lock()
|
||||
|
@ -818,7 +818,7 @@ func (m *meta) AddAllocation(segmentID UniqueID, allocation *Allocation) error {
|
|||
}
|
||||
// Update in-memory meta.
|
||||
m.segments.AddAllocation(segmentID, allocation)
|
||||
log.Debug("meta update: add allocation - complete",
|
||||
log.Info("meta update: add allocation - complete",
|
||||
zap.Int64("segmentID", segmentID))
|
||||
return nil
|
||||
}
|
||||
|
@ -931,7 +931,7 @@ func (m *meta) GetCompleteCompactionMeta(compactionLogs []*datapb.CompactionSegm
|
|||
}
|
||||
segment := NewSegmentInfo(segmentInfo)
|
||||
|
||||
log.Debug("meta update: get complete compaction meta - complete",
|
||||
log.Info("meta update: get complete compaction meta - complete",
|
||||
zap.Int64("segmentID", segmentInfo.ID),
|
||||
zap.Int64("collectionID", segmentInfo.CollectionID),
|
||||
zap.Int64("partitionID", segmentInfo.PartitionID),
|
||||
|
@ -946,7 +946,7 @@ func (m *meta) alterMetaStoreAfterCompaction(modSegments []*datapb.SegmentInfo,
|
|||
}
|
||||
|
||||
func (m *meta) revertAlterMetaStoreAfterCompaction(oldSegments []*datapb.SegmentInfo, removalSegment *datapb.SegmentInfo) error {
|
||||
log.Debug("revert metastore after compaction failure",
|
||||
log.Info("revert metastore after compaction failure",
|
||||
zap.Int64("collectionID", removalSegment.CollectionID),
|
||||
zap.Int64("partitionID", removalSegment.PartitionID),
|
||||
zap.Int64("compactedTo", removalSegment.ID),
|
||||
|
@ -960,7 +960,7 @@ func (m *meta) alterInMemoryMetaAfterCompaction(segmentCompactTo *SegmentInfo, s
|
|||
for _, v := range segmentsCompactFrom {
|
||||
compactFromIDs = append(compactFromIDs, v.GetID())
|
||||
}
|
||||
log.Debug("meta update: alter in memory meta after compaction",
|
||||
log.Info("meta update: alter in memory meta after compaction",
|
||||
zap.Int64("compact to segment ID", segmentCompactTo.GetID()),
|
||||
zap.Int64s("compact from segment IDs", compactFromIDs))
|
||||
m.Lock()
|
||||
|
@ -974,7 +974,7 @@ func (m *meta) alterInMemoryMetaAfterCompaction(segmentCompactTo *SegmentInfo, s
|
|||
if segmentCompactTo.GetNumOfRows() > 0 {
|
||||
m.segments.SetSegment(segmentCompactTo.GetID(), segmentCompactTo)
|
||||
}
|
||||
log.Debug("meta update: alter in memory meta after compaction - complete",
|
||||
log.Info("meta update: alter in memory meta after compaction - complete",
|
||||
zap.Int64("compact to segment ID", segmentCompactTo.GetID()),
|
||||
zap.Int64s("compact from segment IDs", compactFromIDs))
|
||||
}
|
||||
|
|
|
@ -311,14 +311,14 @@ func (s *Server) Start() error {
|
|||
log.Info("datacoord switch from standby to active, activating")
|
||||
s.startServerLoop()
|
||||
s.stateCode.Store(commonpb.StateCode_Healthy)
|
||||
logutil.Logger(s.ctx).Debug("startup success")
|
||||
logutil.Logger(s.ctx).Info("startup success")
|
||||
}
|
||||
s.stateCode.Store(commonpb.StateCode_StandBy)
|
||||
logutil.Logger(s.ctx).Debug("DataCoord enter standby mode successfully")
|
||||
logutil.Logger(s.ctx).Info("DataCoord enter standby mode successfully")
|
||||
} else {
|
||||
s.startServerLoop()
|
||||
s.stateCode.Store(commonpb.StateCode_Healthy)
|
||||
logutil.Logger(s.ctx).Debug("DataCoord startup successfully")
|
||||
logutil.Logger(s.ctx).Info("DataCoord startup successfully")
|
||||
}
|
||||
|
||||
Params.DataCoordCfg.CreatedTime = time.Now()
|
||||
|
@ -603,7 +603,7 @@ func (s *Server) updateSegmentStatistics(stats []*datapb.SegmentStats) {
|
|||
// Log if # of rows is updated.
|
||||
if s.meta.GetSegmentUnsafe(stat.GetSegmentID()) != nil &&
|
||||
s.meta.GetSegmentUnsafe(stat.GetSegmentID()).GetNumOfRows() != stat.GetNumRows() {
|
||||
log.Debug("Updating segment number of rows",
|
||||
log.Info("Updating segment number of rows",
|
||||
zap.Int64("segment ID", stat.GetSegmentID()),
|
||||
zap.Int64("old value", s.meta.GetSegmentUnsafe(stat.GetSegmentID()).GetNumOfRows()),
|
||||
zap.Int64("new value", stat.GetNumRows()),
|
||||
|
@ -784,7 +784,7 @@ func (s *Server) startFlushLoop(ctx context.Context) {
|
|||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logutil.Logger(s.ctx).Debug("flush loop shutdown")
|
||||
logutil.Logger(s.ctx).Info("flush loop shutdown")
|
||||
return
|
||||
case segmentID := <-s.flushCh:
|
||||
//Ignore return error
|
||||
|
@ -845,7 +845,7 @@ func (s *Server) Stop() error {
|
|||
if !s.stateCode.CompareAndSwap(commonpb.StateCode_Healthy, commonpb.StateCode_Abnormal) {
|
||||
return nil
|
||||
}
|
||||
logutil.Logger(s.ctx).Debug("server shutdown")
|
||||
logutil.Logger(s.ctx).Info("server shutdown")
|
||||
s.cluster.Close()
|
||||
s.garbageCollector.close()
|
||||
s.stopServerLoop()
|
||||
|
|
|
@ -279,7 +279,7 @@ func (s *Server) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsert
|
|||
// for now only row count is returned
|
||||
func (s *Server) GetCollectionStatistics(ctx context.Context, req *datapb.GetCollectionStatisticsRequest) (*datapb.GetCollectionStatisticsResponse, error) {
|
||||
ctx = logutil.WithModule(ctx, moduleName)
|
||||
logutil.Logger(ctx).Debug("received request to get collection statistics")
|
||||
logutil.Logger(ctx).Info("received request to get collection statistics")
|
||||
resp := &datapb.GetCollectionStatisticsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
|
@ -292,7 +292,7 @@ func (s *Server) GetCollectionStatistics(ctx context.Context, req *datapb.GetCol
|
|||
nums := s.meta.GetNumRowsOfCollection(req.CollectionID)
|
||||
resp.Status.ErrorCode = commonpb.ErrorCode_Success
|
||||
resp.Stats = append(resp.Stats, &commonpb.KeyValuePair{Key: "row_count", Value: strconv.FormatInt(nums, 10)})
|
||||
logutil.Logger(ctx).Debug("success to get collection statistics", zap.Any("response", resp))
|
||||
logutil.Logger(ctx).Info("success to get collection statistics", zap.Any("response", resp))
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
|
@ -319,7 +319,7 @@ func (s *Server) GetPartitionStatistics(ctx context.Context, req *datapb.GetPart
|
|||
}
|
||||
resp.Status.ErrorCode = commonpb.ErrorCode_Success
|
||||
resp.Stats = append(resp.Stats, &commonpb.KeyValuePair{Key: "row_count", Value: strconv.FormatInt(nums, 10)})
|
||||
logutil.Logger(ctx).Debug("success to get partition statistics", zap.Any("response", resp))
|
||||
logutil.Logger(ctx).Info("success to get partition statistics", zap.Any("response", resp))
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
|
@ -614,7 +614,7 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf
|
|||
for _, c := range channels {
|
||||
channelInfo := s.handler.GetQueryVChanPositions(&channel{Name: c, CollectionID: collectionID}, partitionID)
|
||||
channelInfos = append(channelInfos, channelInfo)
|
||||
log.Debug("datacoord append channelInfo in GetRecoveryInfo",
|
||||
log.Info("datacoord append channelInfo in GetRecoveryInfo",
|
||||
zap.Any("channelInfo", channelInfo),
|
||||
)
|
||||
flushedIDs.Insert(channelInfo.GetFlushedSegmentIds()...)
|
||||
|
@ -712,7 +712,7 @@ func (s *Server) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedS
|
|||
}
|
||||
collectionID := req.GetCollectionID()
|
||||
partitionID := req.GetPartitionID()
|
||||
log.Debug("received get flushed segments request",
|
||||
log.Info("received get flushed segments request",
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Int64("partitionID", partitionID),
|
||||
)
|
||||
|
@ -755,7 +755,7 @@ func (s *Server) GetSegmentsByStates(ctx context.Context, req *datapb.GetSegment
|
|||
collectionID := req.GetCollectionID()
|
||||
partitionID := req.GetPartitionID()
|
||||
states := req.GetStates()
|
||||
log.Debug("received get segments by states request",
|
||||
log.Info("received get segments by states request",
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Int64("partitionID", partitionID),
|
||||
zap.Any("states", states))
|
||||
|
|
|
@ -155,7 +155,7 @@ func FilterInIndexedSegments(handler Handler, indexCoord types.IndexCoord, segme
|
|||
}
|
||||
indexed := extractSegmentsWithVectorIndex(vecFieldID, resp.GetSegmentInfo())
|
||||
if len(indexed) == 0 {
|
||||
log.Debug("no vector index for the segment",
|
||||
log.Info("no vector index for the segment",
|
||||
zap.Int64("collectionID", segment.GetCollectionID()),
|
||||
zap.Int64("segmentID", segment.GetID()))
|
||||
return
|
||||
|
|
|
@ -175,7 +175,7 @@ func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelT
|
|||
}
|
||||
|
||||
dbuff.updateSize(dbuff.delData.RowCount)
|
||||
log.Debug("mergeDeltalogs end",
|
||||
log.Info("mergeDeltalogs end",
|
||||
zap.Int("number of pks to compact in insert logs", len(pk2ts)),
|
||||
zap.Float64("elapse in ms", nano2Milli(time.Since(mergeStart))))
|
||||
|
||||
|
@ -412,7 +412,7 @@ func (t *compactionTask) merge(
|
|||
uploadStatsTimeCost += time.Since(uploadStatsStart)
|
||||
}
|
||||
|
||||
log.Debug("merge end", zap.Int64("remaining insert numRows", numRows),
|
||||
log.Info("merge end", zap.Int64("remaining insert numRows", numRows),
|
||||
zap.Int64("expired entities", expired), zap.Int("binlog file number", numBinlogs),
|
||||
zap.Float64("download insert log elapse in ms", nano2Milli(downloadTimeCost)),
|
||||
zap.Float64("upload insert log elapse in ms", nano2Milli(uploadInsertTimeCost)),
|
||||
|
@ -452,7 +452,7 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
|
|||
}
|
||||
}
|
||||
|
||||
log.Debug("compaction start", zap.Int64("planID", t.plan.GetPlanID()), zap.Int32("timeout in seconds", t.plan.GetTimeoutInSeconds()))
|
||||
log.Info("compaction start", zap.Int64("planID", t.plan.GetPlanID()), zap.Int32("timeout in seconds", t.plan.GetTimeoutInSeconds()))
|
||||
segIDs := make([]UniqueID, 0, len(t.plan.GetSegmentBinlogs()))
|
||||
for _, s := range t.plan.GetSegmentBinlogs() {
|
||||
segIDs = append(segIDs, s.GetSegmentID())
|
||||
|
@ -475,7 +475,7 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
|
|||
<-ti.Injected()
|
||||
injectEnd := time.Now()
|
||||
defer func() {
|
||||
log.Debug("inject elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(injectEnd.Sub(injectStart))))
|
||||
log.Info("inject elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(injectEnd.Sub(injectStart))))
|
||||
}()
|
||||
|
||||
var (
|
||||
|
@ -536,7 +536,7 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
|
|||
err = g.Wait()
|
||||
downloadEnd := time.Now()
|
||||
defer func() {
|
||||
log.Debug("download deltalogs elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(downloadEnd.Sub(downloadStart))))
|
||||
log.Info("download deltalogs elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(downloadEnd.Sub(downloadStart))))
|
||||
}()
|
||||
|
||||
if err != nil {
|
||||
|
@ -561,7 +561,7 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
|
|||
log.Error("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
log.Debug("upload delta log elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(time.Since(uploadDeltaStart))))
|
||||
log.Info("upload delta log elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(time.Since(uploadDeltaStart))))
|
||||
|
||||
for _, fbl := range deltaInfo {
|
||||
for _, deltaLogInfo := range fbl.GetBinlogs() {
|
||||
|
@ -586,7 +586,7 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
|
|||
ti.injectDone(true)
|
||||
uninjectEnd := time.Now()
|
||||
defer func() {
|
||||
log.Debug("uninject elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(uninjectEnd.Sub(uninjectStart))))
|
||||
log.Info("uninject elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(uninjectEnd.Sub(uninjectStart))))
|
||||
}()
|
||||
|
||||
log.Info("compaction done",
|
||||
|
|
|
@ -348,7 +348,7 @@ func (node *DataNode) handleWatchInfo(e *event, key string, data []byte) {
|
|||
}
|
||||
|
||||
if isEndWatchState(watchInfo.State) {
|
||||
log.Debug("DataNode received a PUT event with an end State", zap.String("state", watchInfo.State.String()))
|
||||
log.Info("DataNode received a PUT event with an end State", zap.String("state", watchInfo.State.String()))
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -474,7 +474,7 @@ func (node *DataNode) Start() error {
|
|||
log.Error("failed to start id allocator", zap.Error(err), zap.String("role", typeutil.DataNodeRole))
|
||||
return err
|
||||
}
|
||||
log.Debug("start id allocator done", zap.String("role", typeutil.DataNodeRole))
|
||||
log.Info("start id allocator done", zap.String("role", typeutil.DataNodeRole))
|
||||
|
||||
rep, err := node.rootCoord.AllocTimestamp(node.ctx, &rootcoordpb.AllocTimestampRequest{
|
||||
Base: commonpbutil.NewMsgBase(
|
||||
|
@ -877,7 +877,7 @@ func (node *DataNode) GetCompactionState(ctx context.Context, req *datapb.Compac
|
|||
node.compactionExecutor.completed.Delete(k)
|
||||
return true
|
||||
})
|
||||
log.Debug("Compaction results", zap.Any("results", results))
|
||||
log.Info("Compaction results", zap.Any("results", results))
|
||||
return &datapb.CompactionStateResponse{
|
||||
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
|
||||
Results: results,
|
||||
|
@ -1080,7 +1080,7 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest)
|
|||
if err != nil {
|
||||
return returnFailFunc(err)
|
||||
}
|
||||
log.Debug("import time range", zap.Uint64("start_ts", tsStart), zap.Uint64("end_ts", tsEnd))
|
||||
log.Info("import time range", zap.Uint64("start_ts", tsStart), zap.Uint64("end_ts", tsEnd))
|
||||
err = importWrapper.Import(req.GetImportTask().GetFiles(), req.GetImportTask().GetRowBased(), false)
|
||||
//err = importWrapper.Import(req.GetImportTask().GetFiles(), req.GetImportTask().GetRowBased(), false, tsStart, tsEnd)
|
||||
if err != nil {
|
||||
|
|
|
@ -342,7 +342,7 @@ func newDDNode(ctx context.Context, collID UniqueID, vChannelName string, droppe
|
|||
deltaStream.SetRepackFunc(msgstream.DefaultRepackFunc)
|
||||
deltaStream.AsProducer([]string{deltaChannelName})
|
||||
metrics.DataNodeNumProducers.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID())).Inc()
|
||||
log.Debug("datanode AsProducer", zap.String("DeltaChannelName", deltaChannelName))
|
||||
log.Info("datanode AsProducer", zap.String("DeltaChannelName", deltaChannelName))
|
||||
var deltaMsgStream msgstream.MsgStream = deltaStream
|
||||
deltaMsgStream.Start()
|
||||
|
||||
|
|
|
@ -123,7 +123,7 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
|
|||
|
||||
// process flush messages
|
||||
if len(fgMsg.segmentsToFlush) > 0 {
|
||||
log.Debug("DeleteNode receives flush message",
|
||||
log.Info("DeleteNode receives flush message",
|
||||
zap.Int64s("segIDs", fgMsg.segmentsToFlush),
|
||||
zap.String("vChannelName", dn.channelName))
|
||||
for _, segmentToFlush := range fgMsg.segmentsToFlush {
|
||||
|
@ -149,7 +149,7 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
|
|||
// process drop collection message, delete node shall notify flush manager all data are cleared and send signal to DataSyncService cleaner
|
||||
if fgMsg.dropCollection {
|
||||
dn.flushManager.notifyAllFlushed()
|
||||
log.Debug("DeleteNode notifies BackgroundGC to release vchannel", zap.String("vChannelName", dn.channelName))
|
||||
log.Info("DeleteNode notifies BackgroundGC to release vchannel", zap.String("vChannelName", dn.channelName))
|
||||
dn.clearSignal <- dn.channelName
|
||||
}
|
||||
|
||||
|
@ -191,7 +191,7 @@ func (dn *deleteNode) updateCompactedSegments() {
|
|||
}
|
||||
}
|
||||
}
|
||||
log.Debug("update delBuf for compacted segments",
|
||||
log.Info("update delBuf for compacted segments",
|
||||
zap.Int64("compactedTo segmentID", compactedTo),
|
||||
zap.Int64s("compactedFrom segmentIDs", compactedFrom),
|
||||
)
|
||||
|
|
|
@ -362,7 +362,7 @@ func (ibNode *insertBufferNode) Flush(fgMsg *flowGraphMsg, seg2Upload []UniqueID
|
|||
segmentsToFlush := make([]UniqueID, 0, len(flushTasks))
|
||||
|
||||
for _, task := range flushTasks {
|
||||
log.Debug("insertBufferNode flushing BufferData",
|
||||
log.Info("insertBufferNode flushing BufferData",
|
||||
zap.Int64("segmentID", task.segmentID),
|
||||
zap.Bool("flushed", task.flushed),
|
||||
zap.Bool("dropped", task.dropped),
|
||||
|
@ -585,7 +585,7 @@ func newInsertBufferNode(ctx context.Context, collID UniqueID, flushCh <-chan fl
|
|||
}
|
||||
wTt.AsProducer([]string{Params.CommonCfg.DataCoordTimeTick})
|
||||
metrics.DataNodeNumProducers.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID())).Inc()
|
||||
log.Debug("datanode AsProducer", zap.String("TimeTickChannelName", Params.CommonCfg.DataCoordTimeTick))
|
||||
log.Info("datanode AsProducer", zap.String("TimeTickChannelName", Params.CommonCfg.DataCoordTimeTick))
|
||||
var wTtMsgStream msgstream.MsgStream = wTt
|
||||
wTtMsgStream.Start()
|
||||
|
||||
|
|
|
@ -136,7 +136,7 @@ func (q *orderFlushQueue) getFlushTaskRunner(pos *internalpb.MsgPosition) *flush
|
|||
t.init(q.notifyFunc, q.postTask, q.tailCh)
|
||||
q.tailCh = t.finishSignal
|
||||
q.tailMut.Unlock()
|
||||
log.Debug("new flush task runner created and initialized",
|
||||
log.Info("new flush task runner created and initialized",
|
||||
zap.Int64("segment ID", q.segmentID),
|
||||
zap.String("pos message ID", string(pos.GetMsgID())),
|
||||
)
|
||||
|
@ -284,7 +284,7 @@ func (m *rendezvousFlushManager) getFlushQueue(segmentID UniqueID) *orderFlushQu
|
|||
}
|
||||
|
||||
func (m *rendezvousFlushManager) handleInsertTask(segmentID UniqueID, task flushInsertTask, binlogs, statslogs map[UniqueID]*datapb.Binlog, flushed bool, dropped bool, pos *internalpb.MsgPosition) {
|
||||
log.Debug("handling insert task",
|
||||
log.Info("handling insert task",
|
||||
zap.Int64("segment ID", segmentID),
|
||||
zap.Bool("flushed", flushed),
|
||||
zap.Bool("dropped", dropped),
|
||||
|
|
|
@ -129,7 +129,7 @@ func (t *flushTaskRunner) runFlushInsert(task flushInsertTask,
|
|||
t.flushed = flushed
|
||||
t.pos = pos
|
||||
t.dropped = dropped
|
||||
log.Debug("running flush insert task",
|
||||
log.Info("running flush insert task",
|
||||
zap.Int64("segment ID", t.segmentID),
|
||||
zap.Bool("flushed", flushed),
|
||||
zap.Bool("dropped", dropped),
|
||||
|
|
Loading…
Reference in New Issue