diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 51ed90a1db..0708d78f98 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -176,8 +176,20 @@ func (m *meta) AddCollection(collection *collectionInfo) { // DropCollection drop a collection from meta func (m *meta) DropCollection(collectionID int64) { log.Info("meta update: drop collection", zap.Int64("collectionID", collectionID)) + segments := m.SelectSegments(WithCollection(collectionID)) m.Lock() defer m.Unlock() + coll, ok := m.collections[collectionID] + if ok { + metrics.CleanupDataCoordNumStoredRows(coll.DatabaseName, collectionID) + metrics.CleanupDataCoordBulkInsertVectors(coll.DatabaseName, collectionID) + for _, seg := range segments { + metrics.CleanupDataCoordSegmentMetrics(coll.DatabaseName, collectionID, seg.ID) + } + } else { + log.Warn("not found database name", zap.Int64("collectionID", collectionID)) + } + delete(m.collections, collectionID) metrics.DataCoordNumCollections.WithLabelValues().Set(float64(len(m.collections))) log.Info("meta update: drop collection - complete", zap.Int64("collectionID", collectionID)) @@ -321,6 +333,8 @@ func (m *meta) GetCollectionBinlogSize() (int64, map[UniqueID]int64, map[UniqueI // GetCollectionIndexFilesSize returns the total index files size of all segment for each collection. func (m *meta) GetCollectionIndexFilesSize() uint64 { + m.RLock() + defer m.RUnlock() var total uint64 for _, segmentIdx := range m.indexMeta.GetAllSegIndexes() { coll, ok := m.collections[segmentIdx.CollectionID] @@ -385,6 +399,11 @@ func (m *meta) DropSegment(segmentID UniqueID) error { return err } metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String(), segment.GetLevel().String()).Dec() + coll, ok := m.collections[segment.CollectionID] + if ok { + metrics.CleanupDataCoordSegmentMetrics(coll.DatabaseName, segment.CollectionID, segment.ID) + } + m.segments.DropSegment(segmentID) log.Info("meta update: dropping segment - complete", zap.Int64("segmentID", segmentID)) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 22e4a2d585..146d3ebe92 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -586,7 +586,6 @@ func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtual return resp, nil } - var collectionID int64 segments := make([]*SegmentInfo, 0, len(req.GetSegments())) for _, seg2Drop := range req.GetSegments() { info := &datapb.SegmentInfo{ @@ -602,7 +601,6 @@ func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtual } segment := NewSegmentInfo(info) segments = append(segments, segment) - collectionID = seg2Drop.GetCollectionID() } err := s.meta.UpdateDropChannelSegmentInfo(channel, segments) @@ -619,11 +617,7 @@ func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtual } s.segmentManager.DropSegmentsOfChannel(ctx, channel) s.compactionHandler.removeTasksByChannel(channel) - - metrics.CleanupDataCoordNumStoredRows(collectionID) metrics.DataCoordCheckpointUnixSeconds.DeleteLabelValues(fmt.Sprint(paramtable.GetNodeID()), channel) - metrics.CleanupDataCoordBulkInsertVectors(collectionID) - // no compaction triggered in Drop procedure return resp, nil } diff --git a/internal/metastore/kv/datacoord/kv_catalog.go b/internal/metastore/kv/datacoord/kv_catalog.go index f9b9c86ce6..eaa33ba18f 100644 --- a/internal/metastore/kv/datacoord/kv_catalog.go +++ b/internal/metastore/kv/datacoord/kv_catalog.go @@ -445,7 +445,6 @@ func (kc *Catalog) DropSegment(ctx context.Context, segment *datapb.SegmentInfo) return err } - metrics.CleanupDataCoordSegmentMetrics(segment.CollectionID, segment.ID) return nil } diff --git a/pkg/metrics/datacoord_metrics.go b/pkg/metrics/datacoord_metrics.go index dfd8749f53..0fb96d9cff 100644 --- a/pkg/metrics/datacoord_metrics.go +++ b/pkg/metrics/datacoord_metrics.go @@ -322,34 +322,38 @@ func RegisterDataCoord(registry *prometheus.Registry) { registry.MustRegister(GarbageCollectorRunCount) } -func CleanupDataCoordSegmentMetrics(collectionID int64, segmentID int64) { +func CleanupDataCoordSegmentMetrics(dbName string, collectionID int64, segmentID int64) { DataCoordSegmentBinLogFileCount. Delete( prometheus.Labels{ collectionIDLabelName: fmt.Sprint(collectionID), segmentIDLabelName: fmt.Sprint(segmentID), }) - DataCoordStoredBinlogSize.DeletePartialMatch(prometheus.Labels{ + DataCoordStoredBinlogSize.Delete(prometheus.Labels{ + databaseLabelName: dbName, collectionIDLabelName: fmt.Sprint(collectionID), segmentIDLabelName: fmt.Sprint(segmentID), }) - DataCoordStoredIndexFilesSize.DeletePartialMatch(prometheus.Labels{ + DataCoordStoredIndexFilesSize.Delete(prometheus.Labels{ + databaseLabelName: dbName, collectionIDLabelName: fmt.Sprint(collectionID), segmentIDLabelName: fmt.Sprint(segmentID), }) } -func CleanupDataCoordNumStoredRows(collectionID int64) { +func CleanupDataCoordNumStoredRows(dbName string, collectionID int64) { for _, state := range commonpb.SegmentState_name { - DataCoordNumStoredRows.DeletePartialMatch(prometheus.Labels{ + DataCoordNumStoredRows.Delete(prometheus.Labels{ + databaseLabelName: dbName, collectionIDLabelName: fmt.Sprint(collectionID), segmentStateLabelName: fmt.Sprint(state), }) } } -func CleanupDataCoordBulkInsertVectors(collectionID int64) { - DataCoordBulkVectors.DeletePartialMatch(prometheus.Labels{ +func CleanupDataCoordBulkInsertVectors(dbName string, collectionID int64) { + DataCoordBulkVectors.Delete(prometheus.Labels{ + databaseLabelName: dbName, collectionIDLabelName: fmt.Sprint(collectionID), }) }