mirror of https://github.com/milvus-io/milvus.git
Add logs to better track DataCoord meta update (#19485)
/kind improvement Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com> Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>pull/19580/head
parent
45f5007410
commit
b8b2e8eb92
|
@ -87,10 +87,14 @@ func (m *meta) reloadFromKV() error {
|
|||
// AddCollection adds a collection into meta
|
||||
// Note that collection info is just for caching and will not be set into etcd from datacoord
|
||||
func (m *meta) AddCollection(collection *datapb.CollectionInfo) {
|
||||
log.Info("meta update: add collection",
|
||||
zap.Int64("collection ID", collection.GetID()))
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
m.collections[collection.ID] = collection
|
||||
metrics.DataCoordNumCollections.WithLabelValues().Set(float64(len(m.collections)))
|
||||
log.Info("meta update: add collection - complete",
|
||||
zap.Int64("collection ID", collection.GetID()))
|
||||
}
|
||||
|
||||
// GetCollection returns collection info with provided collection id from local cache
|
||||
|
@ -106,13 +110,13 @@ func (m *meta) GetCollection(collectionID UniqueID) *datapb.CollectionInfo {
|
|||
|
||||
// chanPartSegments is an internal result struct, which is aggregates of SegmentInfos with same collectionID, partitionID and channelName
|
||||
type chanPartSegments struct {
|
||||
collecionID UniqueID
|
||||
partitionID UniqueID
|
||||
channelName string
|
||||
segments []*SegmentInfo
|
||||
collectionID UniqueID
|
||||
partitionID UniqueID
|
||||
channelName string
|
||||
segments []*SegmentInfo
|
||||
}
|
||||
|
||||
// GetSegmentsChanPart returns segments organized in Channel-Parition dimension with selector applied
|
||||
// GetSegmentsChanPart returns segments organized in Channel-Partition dimension with selector applied
|
||||
func (m *meta) GetSegmentsChanPart(selector SegmentInfoSelector) []*chanPartSegments {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
|
@ -126,9 +130,9 @@ func (m *meta) GetSegmentsChanPart(selector SegmentInfoSelector) []*chanPartSegm
|
|||
entry, ok := mDimEntry[dim]
|
||||
if !ok {
|
||||
entry = &chanPartSegments{
|
||||
collecionID: segmentInfo.CollectionID,
|
||||
partitionID: segmentInfo.PartitionID,
|
||||
channelName: segmentInfo.InsertChannel,
|
||||
collectionID: segmentInfo.CollectionID,
|
||||
partitionID: segmentInfo.PartitionID,
|
||||
channelName: segmentInfo.InsertChannel,
|
||||
}
|
||||
mDimEntry[dim] = entry
|
||||
}
|
||||
|
@ -158,29 +162,45 @@ func (m *meta) GetNumRowsOfCollection(collectionID UniqueID) int64 {
|
|||
|
||||
// AddSegment records segment info, persisting info into kv store
|
||||
func (m *meta) AddSegment(segment *SegmentInfo) error {
|
||||
log.Info("meta update: adding segment",
|
||||
zap.Int64("segment ID", segment.GetID()))
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
if err := m.catalog.AddSegment(m.ctx, segment.SegmentInfo); err != nil {
|
||||
log.Error("meta update: adding segment failed",
|
||||
zap.Int64("segment ID", segment.GetID()),
|
||||
zap.Error(err))
|
||||
return err
|
||||
}
|
||||
m.segments.SetSegment(segment.GetID(), segment)
|
||||
metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String()).Inc()
|
||||
log.Info("meta update: adding segment - complete",
|
||||
zap.Int64("segment ID", segment.GetID()))
|
||||
return nil
|
||||
}
|
||||
|
||||
// DropSegment remove segment with provided id, etcd persistence also removed
|
||||
func (m *meta) DropSegment(segmentID UniqueID) error {
|
||||
log.Info("meta update: dropping segment",
|
||||
zap.Int64("segment ID", segmentID))
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
segment := m.segments.GetSegment(segmentID)
|
||||
if segment == nil {
|
||||
log.Warn("meta update: dropping segment failed - segment not found",
|
||||
zap.Int64("segment ID", segmentID))
|
||||
return nil
|
||||
}
|
||||
if err := m.catalog.DropSegment(m.ctx, segment.SegmentInfo); err != nil {
|
||||
log.Warn("meta update: dropping segment failed",
|
||||
zap.Int64("segment ID", segmentID),
|
||||
zap.Error(err))
|
||||
return err
|
||||
}
|
||||
metrics.DataCoordNumSegments.WithLabelValues(metrics.DropedSegmentLabel).Inc()
|
||||
m.segments.DropSegment(segmentID)
|
||||
log.Info("meta update: dropping segment - complete",
|
||||
zap.Int64("segment ID", segmentID))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -218,11 +238,17 @@ func (m *meta) GetAllSegment(segID UniqueID) *SegmentInfo {
|
|||
|
||||
// SetState setting segment with provided ID state
|
||||
func (m *meta) SetState(segmentID UniqueID, targetState commonpb.SegmentState) error {
|
||||
log.Info("meta update: setting segment state",
|
||||
zap.Int64("segment ID", segmentID),
|
||||
zap.Any("target state", targetState))
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
curSegInfo := m.segments.GetSegment(segmentID)
|
||||
if curSegInfo == nil {
|
||||
// TODO: Should return error instead.
|
||||
log.Warn("meta update: setting segment state - segment not found",
|
||||
zap.Int64("segment ID", segmentID),
|
||||
zap.Any("target state", targetState))
|
||||
// TODO: Should probably return error instead.
|
||||
return nil
|
||||
}
|
||||
// Persist segment updates first.
|
||||
|
@ -231,9 +257,9 @@ func (m *meta) SetState(segmentID UniqueID, targetState commonpb.SegmentState) e
|
|||
oldState := curSegInfo.GetState()
|
||||
if clonedSegment != nil && isSegmentHealthy(clonedSegment) {
|
||||
if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{clonedSegment.SegmentInfo}); err != nil {
|
||||
log.Error("failed to alter segments",
|
||||
log.Error("meta update: setting segment state - failed to alter segments",
|
||||
zap.Int64("segment ID", segmentID),
|
||||
zap.Any("target state", targetState),
|
||||
zap.String("target state", targetState.String()),
|
||||
zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
@ -248,11 +274,16 @@ func (m *meta) SetState(segmentID UniqueID, targetState commonpb.SegmentState) e
|
|||
}
|
||||
// Update in-memory meta.
|
||||
m.segments.SetState(segmentID, targetState)
|
||||
log.Info("meta update: setting segment state - complete",
|
||||
zap.Int64("segment ID", segmentID),
|
||||
zap.String("target state", targetState.String()))
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnsetIsImporting removes the `isImporting` flag of a segment.
|
||||
func (m *meta) UnsetIsImporting(segmentID UniqueID) error {
|
||||
log.Info("meta update: unsetting isImport state of segment",
|
||||
zap.Int64("segment ID", segmentID))
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
curSegInfo := m.segments.GetSegment(segmentID)
|
||||
|
@ -263,10 +294,8 @@ func (m *meta) UnsetIsImporting(segmentID UniqueID) error {
|
|||
clonedSegment := curSegInfo.Clone()
|
||||
clonedSegment.IsImporting = false
|
||||
if isSegmentHealthy(clonedSegment) {
|
||||
log.Info("unsetting isImport state of segment",
|
||||
zap.Int64("segment ID", segmentID))
|
||||
if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{clonedSegment.SegmentInfo}); err != nil {
|
||||
log.Error("failed to unset segment isImporting state",
|
||||
log.Error("meta update: unsetting isImport state of segment - failed to unset segment isImporting state",
|
||||
zap.Int64("segment ID", segmentID),
|
||||
zap.Error(err))
|
||||
return err
|
||||
|
@ -274,6 +303,8 @@ func (m *meta) UnsetIsImporting(segmentID UniqueID) error {
|
|||
}
|
||||
// Update in-memory meta.
|
||||
m.segments.SetIsImporting(segmentID, false)
|
||||
log.Info("meta update: unsetting isImport state of segment - complete",
|
||||
zap.Int64("segment ID", segmentID))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -289,10 +320,7 @@ func (m *meta) UpdateFlushSegmentsInfo(
|
|||
checkpoints []*datapb.CheckPoint,
|
||||
startPositions []*datapb.SegmentStartPosition,
|
||||
) error {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
log.Info("update flush segments info",
|
||||
log.Info("meta update: update flush segments info",
|
||||
zap.Int64("segmentId", segmentID),
|
||||
zap.Int("binlog", len(binlogs)),
|
||||
zap.Int("stats log", len(statslogs)),
|
||||
|
@ -302,24 +330,28 @@ func (m *meta) UpdateFlushSegmentsInfo(
|
|||
zap.Any("check points", checkpoints),
|
||||
zap.Any("start position", startPositions),
|
||||
zap.Bool("importing", importing))
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
segment := m.segments.GetSegment(segmentID)
|
||||
if importing {
|
||||
m.segments.SetRowCount(segmentID, segment.currRows)
|
||||
segment = m.segments.GetSegment(segmentID)
|
||||
}
|
||||
if segment == nil || !isSegmentHealthy(segment) {
|
||||
log.Warn("meta update: update flush segments info - segment not found",
|
||||
zap.Int64("segment ID", segmentID),
|
||||
zap.Bool("segment nil", segment == nil),
|
||||
zap.Bool("segment unhealthy", !isSegmentHealthy(segment)))
|
||||
return nil
|
||||
}
|
||||
|
||||
clonedSegment := segment.Clone()
|
||||
|
||||
modSegments := make(map[UniqueID]*SegmentInfo)
|
||||
|
||||
if flushed {
|
||||
clonedSegment.State = commonpb.SegmentState_Flushing
|
||||
modSegments[segmentID] = clonedSegment
|
||||
}
|
||||
|
||||
if dropped {
|
||||
clonedSegment.State = commonpb.SegmentState_Dropped
|
||||
clonedSegment.DroppedAt = uint64(time.Now().UnixNano())
|
||||
|
@ -327,7 +359,6 @@ func (m *meta) UpdateFlushSegmentsInfo(
|
|||
}
|
||||
// TODO add diff encoding and compression
|
||||
currBinlogs := clonedSegment.GetBinlogs()
|
||||
|
||||
var getFieldBinlogs = func(id UniqueID, binlogs []*datapb.FieldBinlog) *datapb.FieldBinlog {
|
||||
for _, binlog := range binlogs {
|
||||
if id == binlog.GetFieldID() {
|
||||
|
@ -361,9 +392,7 @@ func (m *meta) UpdateFlushSegmentsInfo(
|
|||
}
|
||||
}
|
||||
clonedSegment.Deltalogs = currDeltaLogs
|
||||
|
||||
modSegments[segmentID] = clonedSegment
|
||||
|
||||
var getClonedSegment = func(segmentID UniqueID) *SegmentInfo {
|
||||
if s, ok := modSegments[segmentID]; ok {
|
||||
return s
|
||||
|
@ -373,7 +402,6 @@ func (m *meta) UpdateFlushSegmentsInfo(
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, pos := range startPositions {
|
||||
if len(pos.GetStartPosition().GetMsgID()) == 0 {
|
||||
continue
|
||||
|
@ -386,7 +414,6 @@ func (m *meta) UpdateFlushSegmentsInfo(
|
|||
s.StartPosition = pos.GetStartPosition()
|
||||
modSegments[pos.GetSegmentID()] = s
|
||||
}
|
||||
|
||||
for _, cp := range checkpoints {
|
||||
s := getClonedSegment(cp.GetSegmentID())
|
||||
if s == nil {
|
||||
|
@ -402,13 +429,13 @@ func (m *meta) UpdateFlushSegmentsInfo(
|
|||
s.NumOfRows = cp.GetNumOfRows()
|
||||
modSegments[cp.GetSegmentID()] = s
|
||||
}
|
||||
|
||||
segments := make([]*datapb.SegmentInfo, 0, len(modSegments))
|
||||
for _, seg := range modSegments {
|
||||
segments = append(segments, seg.SegmentInfo)
|
||||
}
|
||||
if err := m.catalog.AlterSegments(m.ctx, segments); err != nil {
|
||||
log.Error("failed to store flush segment info into Etcd", zap.Error(err))
|
||||
log.Error("meta update: update flush segments info - failed to store flush segment info into Etcd",
|
||||
zap.Error(err))
|
||||
return err
|
||||
}
|
||||
oldSegmentState := segment.GetState()
|
||||
|
@ -425,19 +452,20 @@ func (m *meta) UpdateFlushSegmentsInfo(
|
|||
for id, s := range modSegments {
|
||||
m.segments.SetSegment(id, s)
|
||||
}
|
||||
log.Info("update flush segments info successfully", zap.Int64("segmentId", segmentID))
|
||||
log.Info("meta update: update flush segments info - update flush segments info successfully",
|
||||
zap.Int64("segment ID", segmentID))
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateDropChannelSegmentInfo updates segment checkpoints and binlogs before drop
|
||||
// reusing segment info to pass segment id, binlogs, statslog, deltalog, start position and checkpoint
|
||||
func (m *meta) UpdateDropChannelSegmentInfo(channel string, segments []*SegmentInfo) error {
|
||||
log.Info("meta update: update drop channel segment info",
|
||||
zap.String("channel", channel))
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
modSegments := make(map[UniqueID]*SegmentInfo)
|
||||
originSegments := make(map[UniqueID]*SegmentInfo)
|
||||
|
||||
// save new segments flushed from buffer data
|
||||
for _, seg2Drop := range segments {
|
||||
segment := m.mergeDropSegment(seg2Drop)
|
||||
|
@ -470,6 +498,14 @@ func (m *meta) UpdateDropChannelSegmentInfo(channel string, segments []*SegmentI
|
|||
}
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
log.Info("meta update: update drop channel segment info failed",
|
||||
zap.String("channel", channel),
|
||||
zap.Error(err))
|
||||
} else {
|
||||
log.Info("meta update: update drop channel segment info - complete",
|
||||
zap.String("channel", channel))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -739,18 +775,23 @@ func (m *meta) SelectSegments(selector SegmentInfoSelector) []*SegmentInfo {
|
|||
|
||||
// AddAllocation add allocation in segment
|
||||
func (m *meta) AddAllocation(segmentID UniqueID, allocation *Allocation) error {
|
||||
log.Info("meta update: add allocation",
|
||||
zap.Int64("segmentID", segmentID),
|
||||
zap.Any("allocation", allocation))
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
curSegInfo := m.segments.GetSegment(segmentID)
|
||||
if curSegInfo == nil {
|
||||
// TODO: Error handling.
|
||||
log.Warn("meta update: add allocation failed - segment not found",
|
||||
zap.Int64("segmentID", segmentID))
|
||||
return nil
|
||||
}
|
||||
// Persist segment updates first.
|
||||
clonedSegment := curSegInfo.Clone(AddAllocation(allocation))
|
||||
if clonedSegment != nil && isSegmentHealthy(clonedSegment) {
|
||||
if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{clonedSegment.SegmentInfo}); err != nil {
|
||||
log.Error("failed to add allocation for segment",
|
||||
log.Error("meta update: add allocation failed",
|
||||
zap.Int64("segment ID", segmentID),
|
||||
zap.Error(err))
|
||||
return err
|
||||
|
@ -758,6 +799,8 @@ func (m *meta) AddAllocation(segmentID UniqueID, allocation *Allocation) error {
|
|||
}
|
||||
// Update in-memory meta.
|
||||
m.segments.AddAllocation(segmentID, allocation)
|
||||
log.Info("meta update: add allocation - complete",
|
||||
zap.Int64("segmentID", segmentID))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -793,20 +836,13 @@ func (m *meta) SetSegmentCompacting(segmentID UniqueID, compacting bool) {
|
|||
m.segments.SetIsCompacting(segmentID, compacting)
|
||||
}
|
||||
|
||||
// SetSegmentIsImporting sets the importing state for a segment.
|
||||
func (m *meta) SetSegmentIsImporting(segmentID UniqueID, importing bool) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
m.segments.SetIsImporting(segmentID, importing)
|
||||
}
|
||||
|
||||
// GetCompleteComapctionMeta returns
|
||||
// GetCompleteCompactionMeta returns
|
||||
// - the segment info of compactedFrom segments before compaction to revert
|
||||
// - the segment info of compactedFrom segments after compaction to alter
|
||||
// - the segment info of compactedTo segment after compaction to add
|
||||
// The compactedTo segment could contain 0 numRows
|
||||
func (m *meta) GetCompleteCompactionMeta(compactionLogs []*datapb.CompactionSegmentBinlogs, result *datapb.CompactionResult) ([]*datapb.SegmentInfo, []*SegmentInfo, *SegmentInfo) {
|
||||
log.Info("meta update: get complete compaction meta")
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
|
@ -876,7 +912,8 @@ func (m *meta) GetCompleteCompactionMeta(compactionLogs []*datapb.CompactionSegm
|
|||
}
|
||||
segment := NewSegmentInfo(segmentInfo)
|
||||
|
||||
log.Info("GetCompleteCompactionMeta", zap.Int64("segmentID", segmentInfo.ID),
|
||||
log.Info("meta update: get complete compaction meta - complete",
|
||||
zap.Int64("segmentID", segmentInfo.ID),
|
||||
zap.Int64("collectionID", segmentInfo.CollectionID),
|
||||
zap.Int64("partitionID", segmentInfo.PartitionID),
|
||||
zap.Int64("NumOfRows", segmentInfo.NumOfRows),
|
||||
|
@ -900,6 +937,13 @@ func (m *meta) revertAlterMetaStoreAfterCompaction(oldSegments []*datapb.Segment
|
|||
}
|
||||
|
||||
func (m *meta) alterInMemoryMetaAfterCompaction(segmentCompactTo *SegmentInfo, segmentsCompactFrom []*SegmentInfo) {
|
||||
var compactFromIDs []int64
|
||||
for _, v := range segmentsCompactFrom {
|
||||
compactFromIDs = append(compactFromIDs, v.GetID())
|
||||
}
|
||||
log.Info("meta update: alter in memory meta after compaction",
|
||||
zap.Int64("compact to segment ID", segmentCompactTo.GetID()),
|
||||
zap.Int64s("compact from segment IDs", compactFromIDs))
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
|
@ -911,6 +955,9 @@ func (m *meta) alterInMemoryMetaAfterCompaction(segmentCompactTo *SegmentInfo, s
|
|||
if segmentCompactTo.GetNumOfRows() > 0 {
|
||||
m.segments.SetSegment(segmentCompactTo.GetID(), segmentCompactTo)
|
||||
}
|
||||
log.Info("meta update: alter in memory meta after compaction - complete",
|
||||
zap.Int64("compact to segment ID", segmentCompactTo.GetID()),
|
||||
zap.Int64s("compact from segment IDs", compactFromIDs))
|
||||
}
|
||||
|
||||
func (m *meta) updateBinlogs(origin []*datapb.FieldBinlog, removes []*datapb.FieldBinlog, adds []*datapb.FieldBinlog) []*datapb.FieldBinlog {
|
||||
|
|
Loading…
Reference in New Issue