From b8b2e8eb921f029996795d483dade93bd48005b8 Mon Sep 17 00:00:00 2001 From: Ten Thousand Leaves <69466447+soothing-rain@users.noreply.github.com> Date: Fri, 30 Sep 2022 11:48:55 +0800 Subject: [PATCH] Add logs to better track DataCoord meta update (#19485) /kind improvement Signed-off-by: Yuchen Gao Signed-off-by: Yuchen Gao --- internal/datacoord/meta.go | 131 +++++++++++++++++++++++++------------ 1 file changed, 89 insertions(+), 42 deletions(-) diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index fba8e846ef..866b1cf86b 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -87,10 +87,14 @@ func (m *meta) reloadFromKV() error { // AddCollection adds a collection into meta // Note that collection info is just for caching and will not be set into etcd from datacoord func (m *meta) AddCollection(collection *datapb.CollectionInfo) { + log.Info("meta update: add collection", + zap.Int64("collection ID", collection.GetID())) m.Lock() defer m.Unlock() m.collections[collection.ID] = collection metrics.DataCoordNumCollections.WithLabelValues().Set(float64(len(m.collections))) + log.Info("meta update: add collection - complete", + zap.Int64("collection ID", collection.GetID())) } // GetCollection returns collection info with provided collection id from local cache @@ -106,13 +110,13 @@ func (m *meta) GetCollection(collectionID UniqueID) *datapb.CollectionInfo { // chanPartSegments is an internal result struct, which is aggregates of SegmentInfos with same collectionID, partitionID and channelName type chanPartSegments struct { - collecionID UniqueID - partitionID UniqueID - channelName string - segments []*SegmentInfo + collectionID UniqueID + partitionID UniqueID + channelName string + segments []*SegmentInfo } -// GetSegmentsChanPart returns segments organized in Channel-Parition dimension with selector applied +// GetSegmentsChanPart returns segments organized in Channel-Partition dimension with selector applied func (m *meta) GetSegmentsChanPart(selector SegmentInfoSelector) []*chanPartSegments { m.RLock() defer m.RUnlock() @@ -126,9 +130,9 @@ func (m *meta) GetSegmentsChanPart(selector SegmentInfoSelector) []*chanPartSegm entry, ok := mDimEntry[dim] if !ok { entry = &chanPartSegments{ - collecionID: segmentInfo.CollectionID, - partitionID: segmentInfo.PartitionID, - channelName: segmentInfo.InsertChannel, + collectionID: segmentInfo.CollectionID, + partitionID: segmentInfo.PartitionID, + channelName: segmentInfo.InsertChannel, } mDimEntry[dim] = entry } @@ -158,29 +162,45 @@ func (m *meta) GetNumRowsOfCollection(collectionID UniqueID) int64 { // AddSegment records segment info, persisting info into kv store func (m *meta) AddSegment(segment *SegmentInfo) error { + log.Info("meta update: adding segment", + zap.Int64("segment ID", segment.GetID())) m.Lock() defer m.Unlock() if err := m.catalog.AddSegment(m.ctx, segment.SegmentInfo); err != nil { + log.Error("meta update: adding segment failed", + zap.Int64("segment ID", segment.GetID()), + zap.Error(err)) return err } m.segments.SetSegment(segment.GetID(), segment) metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String()).Inc() + log.Info("meta update: adding segment - complete", + zap.Int64("segment ID", segment.GetID())) return nil } // DropSegment remove segment with provided id, etcd persistence also removed func (m *meta) DropSegment(segmentID UniqueID) error { + log.Info("meta update: dropping segment", + zap.Int64("segment ID", segmentID)) m.Lock() defer m.Unlock() segment := m.segments.GetSegment(segmentID) if segment == nil { + log.Warn("meta update: dropping segment failed - segment not found", + zap.Int64("segment ID", segmentID)) return nil } if err := m.catalog.DropSegment(m.ctx, segment.SegmentInfo); err != nil { + log.Warn("meta update: dropping segment failed", + zap.Int64("segment ID", segmentID), + zap.Error(err)) return err } metrics.DataCoordNumSegments.WithLabelValues(metrics.DropedSegmentLabel).Inc() m.segments.DropSegment(segmentID) + log.Info("meta update: dropping segment - complete", + zap.Int64("segment ID", segmentID)) return nil } @@ -218,11 +238,17 @@ func (m *meta) GetAllSegment(segID UniqueID) *SegmentInfo { // SetState setting segment with provided ID state func (m *meta) SetState(segmentID UniqueID, targetState commonpb.SegmentState) error { + log.Info("meta update: setting segment state", + zap.Int64("segment ID", segmentID), + zap.Any("target state", targetState)) m.Lock() defer m.Unlock() curSegInfo := m.segments.GetSegment(segmentID) if curSegInfo == nil { - // TODO: Should return error instead. + log.Warn("meta update: setting segment state - segment not found", + zap.Int64("segment ID", segmentID), + zap.Any("target state", targetState)) + // TODO: Should probably return error instead. return nil } // Persist segment updates first. @@ -231,9 +257,9 @@ func (m *meta) SetState(segmentID UniqueID, targetState commonpb.SegmentState) e oldState := curSegInfo.GetState() if clonedSegment != nil && isSegmentHealthy(clonedSegment) { if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{clonedSegment.SegmentInfo}); err != nil { - log.Error("failed to alter segments", + log.Error("meta update: setting segment state - failed to alter segments", zap.Int64("segment ID", segmentID), - zap.Any("target state", targetState), + zap.String("target state", targetState.String()), zap.Error(err)) return err } @@ -248,11 +274,16 @@ func (m *meta) SetState(segmentID UniqueID, targetState commonpb.SegmentState) e } // Update in-memory meta. m.segments.SetState(segmentID, targetState) + log.Info("meta update: setting segment state - complete", + zap.Int64("segment ID", segmentID), + zap.String("target state", targetState.String())) return nil } // UnsetIsImporting removes the `isImporting` flag of a segment. func (m *meta) UnsetIsImporting(segmentID UniqueID) error { + log.Info("meta update: unsetting isImport state of segment", + zap.Int64("segment ID", segmentID)) m.Lock() defer m.Unlock() curSegInfo := m.segments.GetSegment(segmentID) @@ -263,10 +294,8 @@ func (m *meta) UnsetIsImporting(segmentID UniqueID) error { clonedSegment := curSegInfo.Clone() clonedSegment.IsImporting = false if isSegmentHealthy(clonedSegment) { - log.Info("unsetting isImport state of segment", - zap.Int64("segment ID", segmentID)) if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{clonedSegment.SegmentInfo}); err != nil { - log.Error("failed to unset segment isImporting state", + log.Error("meta update: unsetting isImport state of segment - failed to unset segment isImporting state", zap.Int64("segment ID", segmentID), zap.Error(err)) return err @@ -274,6 +303,8 @@ func (m *meta) UnsetIsImporting(segmentID UniqueID) error { } // Update in-memory meta. m.segments.SetIsImporting(segmentID, false) + log.Info("meta update: unsetting isImport state of segment - complete", + zap.Int64("segment ID", segmentID)) return nil } @@ -289,10 +320,7 @@ func (m *meta) UpdateFlushSegmentsInfo( checkpoints []*datapb.CheckPoint, startPositions []*datapb.SegmentStartPosition, ) error { - m.Lock() - defer m.Unlock() - - log.Info("update flush segments info", + log.Info("meta update: update flush segments info", zap.Int64("segmentId", segmentID), zap.Int("binlog", len(binlogs)), zap.Int("stats log", len(statslogs)), @@ -302,24 +330,28 @@ func (m *meta) UpdateFlushSegmentsInfo( zap.Any("check points", checkpoints), zap.Any("start position", startPositions), zap.Bool("importing", importing)) + m.Lock() + defer m.Unlock() + segment := m.segments.GetSegment(segmentID) if importing { m.segments.SetRowCount(segmentID, segment.currRows) segment = m.segments.GetSegment(segmentID) } if segment == nil || !isSegmentHealthy(segment) { + log.Warn("meta update: update flush segments info - segment not found", + zap.Int64("segment ID", segmentID), + zap.Bool("segment nil", segment == nil), + zap.Bool("segment unhealthy", !isSegmentHealthy(segment))) return nil } clonedSegment := segment.Clone() - modSegments := make(map[UniqueID]*SegmentInfo) - if flushed { clonedSegment.State = commonpb.SegmentState_Flushing modSegments[segmentID] = clonedSegment } - if dropped { clonedSegment.State = commonpb.SegmentState_Dropped clonedSegment.DroppedAt = uint64(time.Now().UnixNano()) @@ -327,7 +359,6 @@ func (m *meta) UpdateFlushSegmentsInfo( } // TODO add diff encoding and compression currBinlogs := clonedSegment.GetBinlogs() - var getFieldBinlogs = func(id UniqueID, binlogs []*datapb.FieldBinlog) *datapb.FieldBinlog { for _, binlog := range binlogs { if id == binlog.GetFieldID() { @@ -361,9 +392,7 @@ func (m *meta) UpdateFlushSegmentsInfo( } } clonedSegment.Deltalogs = currDeltaLogs - modSegments[segmentID] = clonedSegment - var getClonedSegment = func(segmentID UniqueID) *SegmentInfo { if s, ok := modSegments[segmentID]; ok { return s @@ -373,7 +402,6 @@ func (m *meta) UpdateFlushSegmentsInfo( } return nil } - for _, pos := range startPositions { if len(pos.GetStartPosition().GetMsgID()) == 0 { continue @@ -386,7 +414,6 @@ func (m *meta) UpdateFlushSegmentsInfo( s.StartPosition = pos.GetStartPosition() modSegments[pos.GetSegmentID()] = s } - for _, cp := range checkpoints { s := getClonedSegment(cp.GetSegmentID()) if s == nil { @@ -402,13 +429,13 @@ func (m *meta) UpdateFlushSegmentsInfo( s.NumOfRows = cp.GetNumOfRows() modSegments[cp.GetSegmentID()] = s } - segments := make([]*datapb.SegmentInfo, 0, len(modSegments)) for _, seg := range modSegments { segments = append(segments, seg.SegmentInfo) } if err := m.catalog.AlterSegments(m.ctx, segments); err != nil { - log.Error("failed to store flush segment info into Etcd", zap.Error(err)) + log.Error("meta update: update flush segments info - failed to store flush segment info into Etcd", + zap.Error(err)) return err } oldSegmentState := segment.GetState() @@ -425,19 +452,20 @@ func (m *meta) UpdateFlushSegmentsInfo( for id, s := range modSegments { m.segments.SetSegment(id, s) } - log.Info("update flush segments info successfully", zap.Int64("segmentId", segmentID)) + log.Info("meta update: update flush segments info - update flush segments info successfully", + zap.Int64("segment ID", segmentID)) return nil } // UpdateDropChannelSegmentInfo updates segment checkpoints and binlogs before drop // reusing segment info to pass segment id, binlogs, statslog, deltalog, start position and checkpoint func (m *meta) UpdateDropChannelSegmentInfo(channel string, segments []*SegmentInfo) error { + log.Info("meta update: update drop channel segment info", + zap.String("channel", channel)) m.Lock() defer m.Unlock() - modSegments := make(map[UniqueID]*SegmentInfo) originSegments := make(map[UniqueID]*SegmentInfo) - // save new segments flushed from buffer data for _, seg2Drop := range segments { segment := m.mergeDropSegment(seg2Drop) @@ -470,6 +498,14 @@ func (m *meta) UpdateDropChannelSegmentInfo(channel string, segments []*SegmentI } } } + if err != nil { + log.Info("meta update: update drop channel segment info failed", + zap.String("channel", channel), + zap.Error(err)) + } else { + log.Info("meta update: update drop channel segment info - complete", + zap.String("channel", channel)) + } return err } @@ -739,18 +775,23 @@ func (m *meta) SelectSegments(selector SegmentInfoSelector) []*SegmentInfo { // AddAllocation add allocation in segment func (m *meta) AddAllocation(segmentID UniqueID, allocation *Allocation) error { + log.Info("meta update: add allocation", + zap.Int64("segmentID", segmentID), + zap.Any("allocation", allocation)) m.Lock() defer m.Unlock() curSegInfo := m.segments.GetSegment(segmentID) if curSegInfo == nil { // TODO: Error handling. + log.Warn("meta update: add allocation failed - segment not found", + zap.Int64("segmentID", segmentID)) return nil } // Persist segment updates first. clonedSegment := curSegInfo.Clone(AddAllocation(allocation)) if clonedSegment != nil && isSegmentHealthy(clonedSegment) { if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{clonedSegment.SegmentInfo}); err != nil { - log.Error("failed to add allocation for segment", + log.Error("meta update: add allocation failed", zap.Int64("segment ID", segmentID), zap.Error(err)) return err @@ -758,6 +799,8 @@ func (m *meta) AddAllocation(segmentID UniqueID, allocation *Allocation) error { } // Update in-memory meta. m.segments.AddAllocation(segmentID, allocation) + log.Info("meta update: add allocation - complete", + zap.Int64("segmentID", segmentID)) return nil } @@ -793,20 +836,13 @@ func (m *meta) SetSegmentCompacting(segmentID UniqueID, compacting bool) { m.segments.SetIsCompacting(segmentID, compacting) } -// SetSegmentIsImporting sets the importing state for a segment. -func (m *meta) SetSegmentIsImporting(segmentID UniqueID, importing bool) { - m.Lock() - defer m.Unlock() - - m.segments.SetIsImporting(segmentID, importing) -} - -// GetCompleteComapctionMeta returns +// GetCompleteCompactionMeta returns // - the segment info of compactedFrom segments before compaction to revert // - the segment info of compactedFrom segments after compaction to alter // - the segment info of compactedTo segment after compaction to add // The compactedTo segment could contain 0 numRows func (m *meta) GetCompleteCompactionMeta(compactionLogs []*datapb.CompactionSegmentBinlogs, result *datapb.CompactionResult) ([]*datapb.SegmentInfo, []*SegmentInfo, *SegmentInfo) { + log.Info("meta update: get complete compaction meta") m.Lock() defer m.Unlock() @@ -876,7 +912,8 @@ func (m *meta) GetCompleteCompactionMeta(compactionLogs []*datapb.CompactionSegm } segment := NewSegmentInfo(segmentInfo) - log.Info("GetCompleteCompactionMeta", zap.Int64("segmentID", segmentInfo.ID), + log.Info("meta update: get complete compaction meta - complete", + zap.Int64("segmentID", segmentInfo.ID), zap.Int64("collectionID", segmentInfo.CollectionID), zap.Int64("partitionID", segmentInfo.PartitionID), zap.Int64("NumOfRows", segmentInfo.NumOfRows), @@ -900,6 +937,13 @@ func (m *meta) revertAlterMetaStoreAfterCompaction(oldSegments []*datapb.Segment } func (m *meta) alterInMemoryMetaAfterCompaction(segmentCompactTo *SegmentInfo, segmentsCompactFrom []*SegmentInfo) { + var compactFromIDs []int64 + for _, v := range segmentsCompactFrom { + compactFromIDs = append(compactFromIDs, v.GetID()) + } + log.Info("meta update: alter in memory meta after compaction", + zap.Int64("compact to segment ID", segmentCompactTo.GetID()), + zap.Int64s("compact from segment IDs", compactFromIDs)) m.Lock() defer m.Unlock() @@ -911,6 +955,9 @@ func (m *meta) alterInMemoryMetaAfterCompaction(segmentCompactTo *SegmentInfo, s if segmentCompactTo.GetNumOfRows() > 0 { m.segments.SetSegment(segmentCompactTo.GetID(), segmentCompactTo) } + log.Info("meta update: alter in memory meta after compaction - complete", + zap.Int64("compact to segment ID", segmentCompactTo.GetID()), + zap.Int64s("compact from segment IDs", compactFromIDs)) } func (m *meta) updateBinlogs(origin []*datapb.FieldBinlog, removes []*datapb.FieldBinlog, adds []*datapb.FieldBinlog) []*datapb.FieldBinlog {