mirror of https://github.com/milvus-io/milvus.git
enhance: [10kcp] Add secondary index for querynode segment manager (#38312)
Cherry pick from pr #38311 Related to #37630 Add secondary index with vchannel to reduce `GetBy` rlock holding time when segment number is large. Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/38324/head
parent
3e65cc5850
commit
24a055996b
|
@ -50,117 +50,6 @@ import (
|
|||
// TODO maybe move to manager and change segment constructor
|
||||
var channelMapper = metautil.NewDynChannelMapper()
|
||||
|
||||
// SegmentFilter is the interface for segment selection criteria.
|
||||
type SegmentFilter interface {
|
||||
Filter(segment Segment) bool
|
||||
SegmentType() (SegmentType, bool)
|
||||
SegmentIDs() ([]int64, bool)
|
||||
}
|
||||
|
||||
// SegmentFilterFunc is a type wrapper for `func(Segment) bool` to SegmentFilter.
|
||||
type SegmentFilterFunc func(segment Segment) bool
|
||||
|
||||
func (f SegmentFilterFunc) Filter(segment Segment) bool {
|
||||
return f(segment)
|
||||
}
|
||||
|
||||
func (f SegmentFilterFunc) SegmentType() (SegmentType, bool) {
|
||||
return commonpb.SegmentState_SegmentStateNone, false
|
||||
}
|
||||
|
||||
func (s SegmentFilterFunc) SegmentIDs() ([]int64, bool) {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// SegmentIDFilter is the specific segment filter for SegmentID only.
|
||||
type SegmentIDFilter int64
|
||||
|
||||
func (f SegmentIDFilter) Filter(segment Segment) bool {
|
||||
return segment.ID() == int64(f)
|
||||
}
|
||||
|
||||
func (f SegmentIDFilter) SegmentType() (SegmentType, bool) {
|
||||
return commonpb.SegmentState_SegmentStateNone, false
|
||||
}
|
||||
|
||||
func (f SegmentIDFilter) SegmentIDs() ([]int64, bool) {
|
||||
return []int64{int64(f)}, true
|
||||
}
|
||||
|
||||
type SegmentIDsFilter struct {
|
||||
segmentIDs typeutil.Set[int64]
|
||||
}
|
||||
|
||||
func (f SegmentIDsFilter) Filter(segment Segment) bool {
|
||||
return f.segmentIDs.Contain(segment.ID())
|
||||
}
|
||||
|
||||
func (f SegmentIDsFilter) SegmentType() (SegmentType, bool) {
|
||||
return commonpb.SegmentState_SegmentStateNone, false
|
||||
}
|
||||
|
||||
func (f SegmentIDsFilter) SegmentIDs() ([]int64, bool) {
|
||||
return f.segmentIDs.Collect(), true
|
||||
}
|
||||
|
||||
type SegmentTypeFilter SegmentType
|
||||
|
||||
func (f SegmentTypeFilter) Filter(segment Segment) bool {
|
||||
return segment.Type() == SegmentType(f)
|
||||
}
|
||||
|
||||
func (f SegmentTypeFilter) SegmentType() (SegmentType, bool) {
|
||||
return SegmentType(f), true
|
||||
}
|
||||
|
||||
func (f SegmentTypeFilter) SegmentIDs() ([]int64, bool) {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func WithSkipEmpty() SegmentFilter {
|
||||
return SegmentFilterFunc(func(segment Segment) bool {
|
||||
return segment.InsertCount() > 0
|
||||
})
|
||||
}
|
||||
|
||||
func WithPartition(partitionID typeutil.UniqueID) SegmentFilter {
|
||||
return SegmentFilterFunc(func(segment Segment) bool {
|
||||
return segment.Partition() == partitionID
|
||||
})
|
||||
}
|
||||
|
||||
func WithChannel(channel string) SegmentFilter {
|
||||
ac, err := metautil.ParseChannel(channel, channelMapper)
|
||||
if err != nil {
|
||||
return SegmentFilterFunc(func(segment Segment) bool {
|
||||
return false
|
||||
})
|
||||
}
|
||||
return SegmentFilterFunc(func(segment Segment) bool {
|
||||
return segment.Shard().Equal(ac)
|
||||
})
|
||||
}
|
||||
|
||||
func WithType(typ SegmentType) SegmentFilter {
|
||||
return SegmentTypeFilter(typ)
|
||||
}
|
||||
|
||||
func WithID(id int64) SegmentFilter {
|
||||
return SegmentIDFilter(id)
|
||||
}
|
||||
|
||||
func WithIDs(ids ...int64) SegmentFilter {
|
||||
return SegmentIDsFilter{
|
||||
segmentIDs: typeutil.NewSet(ids...),
|
||||
}
|
||||
}
|
||||
|
||||
func WithLevel(level datapb.SegmentLevel) SegmentFilter {
|
||||
return SegmentFilterFunc(func(segment Segment) bool {
|
||||
return segment.Level() == level
|
||||
})
|
||||
}
|
||||
|
||||
type SegmentAction func(segment Segment) bool
|
||||
|
||||
func IncreaseVersion(version int64) SegmentAction {
|
||||
|
@ -313,12 +202,156 @@ type SegmentManager interface {
|
|||
|
||||
var _ SegmentManager = (*segmentManager)(nil)
|
||||
|
||||
type secondarySegmentIndex struct {
|
||||
shardSegments map[metautil.Channel]segments
|
||||
}
|
||||
|
||||
func newSecondarySegmentIndex() secondarySegmentIndex {
|
||||
return secondarySegmentIndex{
|
||||
shardSegments: make(map[metautil.Channel]segments),
|
||||
}
|
||||
}
|
||||
|
||||
func (si secondarySegmentIndex) Put(ctx context.Context, segmentType SegmentType, segment Segment) {
|
||||
shard := segment.Shard()
|
||||
segments, ok := si.shardSegments[shard]
|
||||
if !ok {
|
||||
segments = newSegments()
|
||||
si.shardSegments[shard] = segments
|
||||
}
|
||||
segments.Put(ctx, segmentType, segment)
|
||||
}
|
||||
|
||||
func (si secondarySegmentIndex) Remove(s Segment) {
|
||||
shard := s.Shard()
|
||||
segments, ok := si.shardSegments[shard]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
segments.Remove(s)
|
||||
if segments.Empty() {
|
||||
delete(si.shardSegments, shard)
|
||||
}
|
||||
}
|
||||
|
||||
type segments struct {
|
||||
growingSegments map[typeutil.UniqueID]Segment
|
||||
sealedSegments map[typeutil.UniqueID]Segment
|
||||
}
|
||||
|
||||
func (segments segments) Put(_ context.Context, segmentType SegmentType, segment Segment) {
|
||||
switch segmentType {
|
||||
case SegmentTypeGrowing:
|
||||
segments.growingSegments[segment.ID()] = segment
|
||||
case SegmentTypeSealed:
|
||||
segments.sealedSegments[segment.ID()] = segment
|
||||
}
|
||||
}
|
||||
|
||||
func (segments segments) Get(segmentID int64) (Segment, bool) {
|
||||
if segment, ok := segments.growingSegments[segmentID]; ok {
|
||||
return segment, true
|
||||
} else if segment, ok = segments.sealedSegments[segmentID]; ok {
|
||||
return segment, true
|
||||
}
|
||||
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func (segments segments) GetWithType(segmentID int64, segmentType SegmentType) (Segment, bool) {
|
||||
// var targetMap map[int64]Segment
|
||||
var segment Segment
|
||||
var ok bool
|
||||
switch segmentType {
|
||||
case SegmentTypeGrowing:
|
||||
segment, ok = segments.growingSegments[segmentID]
|
||||
case SegmentTypeSealed:
|
||||
segment, ok = segments.sealedSegments[segmentID]
|
||||
}
|
||||
return segment, ok
|
||||
}
|
||||
|
||||
func (segments segments) RemoveWithType(segmentID int64, segmentType SegmentType) (Segment, bool) {
|
||||
var segment Segment
|
||||
var ok bool
|
||||
switch segmentType {
|
||||
case SegmentTypeGrowing:
|
||||
segment, ok = segments.growingSegments[segmentID]
|
||||
delete(segments.growingSegments, segmentID)
|
||||
case SegmentTypeSealed:
|
||||
segment, ok = segments.sealedSegments[segmentID]
|
||||
delete(segments.sealedSegments, segmentID)
|
||||
}
|
||||
return segment, ok
|
||||
}
|
||||
|
||||
func (segments segments) Remove(segment Segment) {
|
||||
switch segment.Type() {
|
||||
case SegmentTypeGrowing:
|
||||
delete(segments.growingSegments, segment.ID())
|
||||
case SegmentTypeSealed:
|
||||
delete(segments.sealedSegments, segment.ID())
|
||||
}
|
||||
}
|
||||
|
||||
func (segments segments) RangeWithFilter(criterion *segmentCriterion, process func(id int64, segType SegmentType, segment Segment) bool) {
|
||||
if criterion.segmentIDs != nil {
|
||||
for id := range criterion.segmentIDs {
|
||||
var segment Segment
|
||||
var ok bool
|
||||
if criterion.segmentType == commonpb.SegmentState_SegmentStateNone {
|
||||
segment, ok = segments.Get(id)
|
||||
} else {
|
||||
segment, ok = segments.GetWithType(id, criterion.segmentType)
|
||||
}
|
||||
|
||||
if ok && criterion.Match(segment) {
|
||||
if !process(id, segment.Type(), segment) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
var candidates []map[int64]Segment
|
||||
switch criterion.segmentType {
|
||||
case SegmentTypeGrowing:
|
||||
candidates = []map[int64]Segment{segments.growingSegments}
|
||||
case SegmentTypeSealed:
|
||||
candidates = []map[int64]Segment{segments.sealedSegments}
|
||||
default:
|
||||
candidates = []map[int64]Segment{segments.growingSegments, segments.sealedSegments}
|
||||
}
|
||||
|
||||
for _, candidate := range candidates {
|
||||
for id, segment := range candidate {
|
||||
if criterion.Match(segment) {
|
||||
if !process(id, segment.Type(), segment) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (segments segments) Empty() bool {
|
||||
return len(segments.growingSegments) == 0 && len(segments.sealedSegments) == 0
|
||||
}
|
||||
|
||||
func newSegments() segments {
|
||||
return segments{
|
||||
growingSegments: make(map[int64]Segment),
|
||||
sealedSegments: make(map[int64]Segment),
|
||||
}
|
||||
}
|
||||
|
||||
// Manager manages all collections and segments
|
||||
type segmentManager struct {
|
||||
mu sync.RWMutex // guards all
|
||||
|
||||
growingSegments map[typeutil.UniqueID]Segment
|
||||
sealedSegments map[typeutil.UniqueID]Segment
|
||||
globalSegments segments
|
||||
secondaryIndex secondarySegmentIndex
|
||||
|
||||
// releaseCallback is the callback function when a segment is released.
|
||||
releaseCallback func(s Segment)
|
||||
|
@ -329,30 +362,27 @@ type segmentManager struct {
|
|||
|
||||
func NewSegmentManager() *segmentManager {
|
||||
return &segmentManager{
|
||||
growingSegments: make(map[int64]Segment),
|
||||
sealedSegments: make(map[int64]Segment),
|
||||
globalSegments: newSegments(),
|
||||
secondaryIndex: newSecondarySegmentIndex(),
|
||||
growingOnReleasingSegments: typeutil.NewUniqueSet(),
|
||||
sealedOnReleasingSegments: typeutil.NewUniqueSet(),
|
||||
}
|
||||
}
|
||||
|
||||
// put is the internal put method updating both global segments and secondary index.
|
||||
func (mgr *segmentManager) put(ctx context.Context, segmentType SegmentType, segment Segment) {
|
||||
mgr.globalSegments.Put(ctx, segmentType, segment)
|
||||
mgr.secondaryIndex.Put(ctx, segmentType, segment)
|
||||
}
|
||||
|
||||
func (mgr *segmentManager) Put(ctx context.Context, segmentType SegmentType, segments ...Segment) {
|
||||
var replacedSegment []Segment
|
||||
mgr.mu.Lock()
|
||||
defer mgr.mu.Unlock()
|
||||
var targetMap map[int64]Segment
|
||||
switch segmentType {
|
||||
case SegmentTypeGrowing:
|
||||
targetMap = mgr.growingSegments
|
||||
case SegmentTypeSealed:
|
||||
targetMap = mgr.sealedSegments
|
||||
default:
|
||||
panic("unexpected segment type")
|
||||
}
|
||||
|
||||
log := log.Ctx(ctx)
|
||||
for _, segment := range segments {
|
||||
oldSegment, ok := targetMap[segment.ID()]
|
||||
|
||||
oldSegment, ok := mgr.globalSegments.GetWithType(segment.ID(), segmentType)
|
||||
if ok {
|
||||
if oldSegment.Version() >= segment.Version() {
|
||||
log.Warn("Invalid segment distribution changed, skip it",
|
||||
|
@ -366,7 +396,8 @@ func (mgr *segmentManager) Put(ctx context.Context, segmentType SegmentType, seg
|
|||
}
|
||||
replacedSegment = append(replacedSegment, oldSegment)
|
||||
}
|
||||
targetMap[segment.ID()] = segment
|
||||
|
||||
mgr.put(ctx, segmentType, segment)
|
||||
|
||||
eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("Segment %d[%d] loaded", segment.ID(), segment.Collection())))
|
||||
metrics.QueryNodeNumSegments.WithLabelValues(
|
||||
|
@ -409,17 +440,18 @@ func (mgr *segmentManager) UpdateBy(action SegmentAction, filters ...SegmentFilt
|
|||
func (mgr *segmentManager) Exist(segmentID typeutil.UniqueID, typ SegmentType) bool {
|
||||
mgr.mu.RLock()
|
||||
defer mgr.mu.RUnlock()
|
||||
|
||||
_, ok := mgr.globalSegments.GetWithType(segmentID, typ)
|
||||
if ok {
|
||||
return true
|
||||
}
|
||||
switch typ {
|
||||
case SegmentTypeGrowing:
|
||||
if _, ok := mgr.growingSegments[segmentID]; ok {
|
||||
return true
|
||||
} else if mgr.growingOnReleasingSegments.Contain(segmentID) {
|
||||
if mgr.growingOnReleasingSegments.Contain(segmentID) {
|
||||
return true
|
||||
}
|
||||
case SegmentTypeSealed:
|
||||
if _, ok := mgr.sealedSegments[segmentID]; ok {
|
||||
return true
|
||||
} else if mgr.sealedOnReleasingSegments.Contain(segmentID) {
|
||||
if mgr.sealedOnReleasingSegments.Contain(segmentID) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
@ -431,27 +463,16 @@ func (mgr *segmentManager) Get(segmentID typeutil.UniqueID) Segment {
|
|||
mgr.mu.RLock()
|
||||
defer mgr.mu.RUnlock()
|
||||
|
||||
if segment, ok := mgr.growingSegments[segmentID]; ok {
|
||||
return segment
|
||||
} else if segment, ok = mgr.sealedSegments[segmentID]; ok {
|
||||
return segment
|
||||
}
|
||||
|
||||
return nil
|
||||
segment, _ := mgr.globalSegments.Get(segmentID)
|
||||
return segment
|
||||
}
|
||||
|
||||
func (mgr *segmentManager) GetWithType(segmentID typeutil.UniqueID, typ SegmentType) Segment {
|
||||
mgr.mu.RLock()
|
||||
defer mgr.mu.RUnlock()
|
||||
|
||||
switch typ {
|
||||
case SegmentTypeSealed:
|
||||
return mgr.sealedSegments[segmentID]
|
||||
case SegmentTypeGrowing:
|
||||
return mgr.growingSegments[segmentID]
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
segment, _ := mgr.globalSegments.GetWithType(segmentID, typ)
|
||||
return segment
|
||||
}
|
||||
|
||||
func (mgr *segmentManager) GetBy(filters ...SegmentFilter) []Segment {
|
||||
|
@ -512,36 +533,31 @@ func (mgr *segmentManager) GetAndPin(segments []int64, filters ...SegmentFilter)
|
|||
}()
|
||||
|
||||
for _, id := range segments {
|
||||
growing, growingExist := mgr.growingSegments[id]
|
||||
sealed, sealedExist := mgr.sealedSegments[id]
|
||||
// growing, growingExist := mgr.growingSegments[id]
|
||||
// sealed, sealedExist := mgr.sealedSegments[id]
|
||||
segment, ok := mgr.globalSegments.Get(id)
|
||||
|
||||
// L0 Segment should not be queryable.
|
||||
if sealedExist && sealed.Level() == datapb.SegmentLevel_L0 {
|
||||
continue
|
||||
}
|
||||
|
||||
growingExist = growingExist && filter(growing, filters...)
|
||||
sealedExist = sealedExist && filter(sealed, filters...)
|
||||
|
||||
if growingExist {
|
||||
err = growing.PinIfNotReleased()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lockedSegments = append(lockedSegments, growing)
|
||||
}
|
||||
if sealedExist {
|
||||
err = sealed.PinIfNotReleased()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lockedSegments = append(lockedSegments, sealed)
|
||||
}
|
||||
|
||||
if !growingExist && !sealedExist {
|
||||
if !ok {
|
||||
err = merr.WrapErrSegmentNotLoaded(id, "segment not found")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// L0 Segment should not be queryable.
|
||||
if segment.Level() == datapb.SegmentLevel_L0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// growingExist = growingExist && filter(growing, filters...)
|
||||
// sealedExist = sealedExist && filter(sealed, filters...)
|
||||
if !filter(segment, filters...) {
|
||||
continue
|
||||
}
|
||||
|
||||
err = segment.PinIfNotReleased()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lockedSegments = append(lockedSegments, segment)
|
||||
}
|
||||
|
||||
return lockedSegments, nil
|
||||
|
@ -554,74 +570,26 @@ func (mgr *segmentManager) Unpin(segments []Segment) {
|
|||
}
|
||||
|
||||
func (mgr *segmentManager) rangeWithFilter(process func(id int64, segType SegmentType, segment Segment) bool, filters ...SegmentFilter) {
|
||||
var segType SegmentType
|
||||
var hasSegType, hasSegIDs bool
|
||||
segmentIDs := typeutil.NewSet[int64]()
|
||||
|
||||
otherFilters := make([]SegmentFilter, 0, len(filters))
|
||||
criterion := &segmentCriterion{}
|
||||
for _, filter := range filters {
|
||||
if sType, ok := filter.SegmentType(); ok {
|
||||
segType = sType
|
||||
hasSegType = true
|
||||
continue
|
||||
}
|
||||
if segIDs, ok := filter.SegmentIDs(); ok {
|
||||
hasSegIDs = true
|
||||
segmentIDs.Insert(segIDs...)
|
||||
continue
|
||||
}
|
||||
otherFilters = append(otherFilters, filter)
|
||||
filter.AddFilter(criterion)
|
||||
}
|
||||
|
||||
mergedFilter := func(info Segment) bool {
|
||||
for _, filter := range otherFilters {
|
||||
if !filter.Filter(info) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
var candidates map[SegmentType]map[int64]Segment
|
||||
switch segType {
|
||||
case SegmentTypeSealed:
|
||||
candidates = map[SegmentType]map[int64]Segment{SegmentTypeSealed: mgr.sealedSegments}
|
||||
case SegmentTypeGrowing:
|
||||
candidates = map[SegmentType]map[int64]Segment{SegmentTypeGrowing: mgr.growingSegments}
|
||||
default:
|
||||
if !hasSegType {
|
||||
candidates = map[SegmentType]map[int64]Segment{
|
||||
SegmentTypeSealed: mgr.sealedSegments,
|
||||
SegmentTypeGrowing: mgr.growingSegments,
|
||||
}
|
||||
target := mgr.globalSegments
|
||||
var ok bool
|
||||
if !criterion.channel.IsZero() {
|
||||
target, ok = mgr.secondaryIndex.shardSegments[criterion.channel]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
for segType, candidate := range candidates {
|
||||
if hasSegIDs {
|
||||
for id := range segmentIDs {
|
||||
segment, has := candidate[id]
|
||||
if has && mergedFilter(segment) {
|
||||
if !process(id, segType, segment) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for id, segment := range candidate {
|
||||
if mergedFilter(segment) {
|
||||
if !process(id, segType, segment) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
target.RangeWithFilter(criterion, process)
|
||||
}
|
||||
|
||||
func filter(segment Segment, filters ...SegmentFilter) bool {
|
||||
for _, filter := range filters {
|
||||
if !filter.Filter(segment) {
|
||||
if !filter.Match(segment) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
@ -629,32 +597,18 @@ func filter(segment Segment, filters ...SegmentFilter) bool {
|
|||
}
|
||||
|
||||
func (mgr *segmentManager) GetSealed(segmentID typeutil.UniqueID) Segment {
|
||||
mgr.mu.RLock()
|
||||
defer mgr.mu.RUnlock()
|
||||
|
||||
if segment, ok := mgr.sealedSegments[segmentID]; ok {
|
||||
return segment
|
||||
}
|
||||
|
||||
return nil
|
||||
return mgr.GetWithType(segmentID, SegmentTypeSealed)
|
||||
}
|
||||
|
||||
func (mgr *segmentManager) GetGrowing(segmentID typeutil.UniqueID) Segment {
|
||||
mgr.mu.RLock()
|
||||
defer mgr.mu.RUnlock()
|
||||
|
||||
if segment, ok := mgr.growingSegments[segmentID]; ok {
|
||||
return segment
|
||||
}
|
||||
|
||||
return nil
|
||||
return mgr.GetWithType(segmentID, SegmentTypeGrowing)
|
||||
}
|
||||
|
||||
func (mgr *segmentManager) Empty() bool {
|
||||
mgr.mu.RLock()
|
||||
defer mgr.mu.RUnlock()
|
||||
|
||||
return len(mgr.growingSegments)+len(mgr.sealedSegments) == 0
|
||||
return len(mgr.globalSegments.growingSegments)+len(mgr.globalSegments.sealedSegments) == 0
|
||||
}
|
||||
|
||||
// returns true if the segment exists,
|
||||
|
@ -703,27 +657,21 @@ func (mgr *segmentManager) Remove(ctx context.Context, segmentID typeutil.Unique
|
|||
}
|
||||
|
||||
func (mgr *segmentManager) removeSegmentWithType(typ SegmentType, segmentID typeutil.UniqueID) Segment {
|
||||
switch typ {
|
||||
case SegmentTypeGrowing:
|
||||
s, ok := mgr.growingSegments[segmentID]
|
||||
if ok {
|
||||
delete(mgr.growingSegments, segmentID)
|
||||
mgr.growingOnReleasingSegments.Insert(segmentID)
|
||||
return s
|
||||
}
|
||||
|
||||
case SegmentTypeSealed:
|
||||
s, ok := mgr.sealedSegments[segmentID]
|
||||
if ok {
|
||||
delete(mgr.sealedSegments, segmentID)
|
||||
mgr.sealedOnReleasingSegments.Insert(segmentID)
|
||||
return s
|
||||
}
|
||||
default:
|
||||
segment, ok := mgr.globalSegments.RemoveWithType(segmentID, typ)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
return nil
|
||||
switch typ {
|
||||
case SegmentTypeGrowing:
|
||||
mgr.growingOnReleasingSegments.Insert(segmentID)
|
||||
case SegmentTypeSealed:
|
||||
mgr.sealedOnReleasingSegments.Insert(segmentID)
|
||||
}
|
||||
|
||||
mgr.secondaryIndex.Remove(segment)
|
||||
|
||||
return segment
|
||||
}
|
||||
|
||||
func (mgr *segmentManager) RemoveBy(ctx context.Context, filters ...SegmentFilter) (int, int) {
|
||||
|
@ -757,17 +705,17 @@ func (mgr *segmentManager) RemoveBy(ctx context.Context, filters ...SegmentFilte
|
|||
func (mgr *segmentManager) Clear(ctx context.Context) {
|
||||
mgr.mu.Lock()
|
||||
|
||||
for id := range mgr.growingSegments {
|
||||
for id := range mgr.globalSegments.growingSegments {
|
||||
mgr.growingOnReleasingSegments.Insert(id)
|
||||
}
|
||||
growingWaitForRelease := mgr.growingSegments
|
||||
mgr.growingSegments = make(map[int64]Segment)
|
||||
growingWaitForRelease := mgr.globalSegments.growingSegments
|
||||
|
||||
for id := range mgr.sealedSegments {
|
||||
for id := range mgr.globalSegments.sealedSegments {
|
||||
mgr.sealedOnReleasingSegments.Insert(id)
|
||||
}
|
||||
sealedWaitForRelease := mgr.sealedSegments
|
||||
mgr.sealedSegments = make(map[int64]Segment)
|
||||
sealedWaitForRelease := mgr.globalSegments.sealedSegments
|
||||
mgr.globalSegments = newSegments()
|
||||
mgr.secondaryIndex = newSecondarySegmentIndex()
|
||||
mgr.updateMetric()
|
||||
mgr.mu.Unlock()
|
||||
|
||||
|
@ -787,21 +735,21 @@ func (mgr *segmentManager) registerReleaseCallback(callback func(s Segment)) {
|
|||
|
||||
func (mgr *segmentManager) updateMetric() {
|
||||
// update collection and partiation metric
|
||||
collections, partiations := make(typeutil.Set[int64]), make(typeutil.Set[int64])
|
||||
for _, seg := range mgr.growingSegments {
|
||||
collections, partitions := make(typeutil.Set[int64]), make(typeutil.Set[int64])
|
||||
for _, seg := range mgr.globalSegments.growingSegments {
|
||||
collections.Insert(seg.Collection())
|
||||
if seg.Partition() != common.AllPartitionsID {
|
||||
partiations.Insert(seg.Partition())
|
||||
partitions.Insert(seg.Partition())
|
||||
}
|
||||
}
|
||||
for _, seg := range mgr.sealedSegments {
|
||||
for _, seg := range mgr.globalSegments.sealedSegments {
|
||||
collections.Insert(seg.Collection())
|
||||
if seg.Partition() != common.AllPartitionsID {
|
||||
partiations.Insert(seg.Partition())
|
||||
partitions.Insert(seg.Partition())
|
||||
}
|
||||
}
|
||||
metrics.QueryNodeNumCollections.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(collections.Len()))
|
||||
metrics.QueryNodeNumPartitions.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(partiations.Len()))
|
||||
metrics.QueryNodeNumPartitions.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(partitions.Len()))
|
||||
}
|
||||
|
||||
func (mgr *segmentManager) release(ctx context.Context, segment Segment) {
|
||||
|
|
|
@ -0,0 +1,142 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package segments
|
||||
|
||||
import (
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/util/metautil"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
// segmentCriterion is the segment filter criterion obj.
|
||||
type segmentCriterion struct {
|
||||
segmentIDs typeutil.Set[int64]
|
||||
channel metautil.Channel
|
||||
segmentType SegmentType
|
||||
others []SegmentFilter
|
||||
}
|
||||
|
||||
func (c *segmentCriterion) Match(segment Segment) bool {
|
||||
for _, filter := range c.others {
|
||||
if !filter.Match(segment) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// SegmentFilter is the interface for segment selection criteria.
|
||||
type SegmentFilter interface {
|
||||
Match(segment Segment) bool
|
||||
AddFilter(*segmentCriterion)
|
||||
}
|
||||
|
||||
// SegmentFilterFunc is a type wrapper for `func(Segment) bool` to SegmentFilter.
|
||||
type SegmentFilterFunc func(segment Segment) bool
|
||||
|
||||
func (f SegmentFilterFunc) Match(segment Segment) bool {
|
||||
return f(segment)
|
||||
}
|
||||
|
||||
func (f SegmentFilterFunc) AddFilter(c *segmentCriterion) {
|
||||
c.others = append(c.others, f)
|
||||
}
|
||||
|
||||
// SegmentIDFilter is the specific segment filter for SegmentID only.
|
||||
type SegmentIDFilter int64
|
||||
|
||||
func (f SegmentIDFilter) Match(segment Segment) bool {
|
||||
return segment.ID() == int64(f)
|
||||
}
|
||||
|
||||
func (f SegmentIDFilter) AddFilter(c *segmentCriterion) {
|
||||
if c.segmentIDs == nil {
|
||||
c.segmentIDs = typeutil.NewSet(int64(f))
|
||||
return
|
||||
}
|
||||
c.segmentIDs = c.segmentIDs.Intersection(typeutil.NewSet(int64(f)))
|
||||
}
|
||||
|
||||
type SegmentIDsFilter struct {
|
||||
segmentIDs typeutil.Set[int64]
|
||||
}
|
||||
|
||||
func (f SegmentIDsFilter) Match(segment Segment) bool {
|
||||
return f.segmentIDs.Contain(segment.ID())
|
||||
}
|
||||
|
||||
func (f SegmentIDsFilter) AddFilter(c *segmentCriterion) {
|
||||
if c.segmentIDs == nil {
|
||||
c.segmentIDs = f.segmentIDs
|
||||
return
|
||||
}
|
||||
c.segmentIDs = c.segmentIDs.Intersection(f.segmentIDs)
|
||||
}
|
||||
|
||||
type SegmentTypeFilter SegmentType
|
||||
|
||||
func (f SegmentTypeFilter) Match(segment Segment) bool {
|
||||
return segment.Type() == SegmentType(f)
|
||||
}
|
||||
|
||||
func (f SegmentTypeFilter) AddFilter(c *segmentCriterion) {
|
||||
c.segmentType = SegmentType(f)
|
||||
}
|
||||
|
||||
func WithSkipEmpty() SegmentFilter {
|
||||
return SegmentFilterFunc(func(segment Segment) bool {
|
||||
return segment.InsertCount() > 0
|
||||
})
|
||||
}
|
||||
|
||||
func WithPartition(partitionID typeutil.UniqueID) SegmentFilter {
|
||||
return SegmentFilterFunc(func(segment Segment) bool {
|
||||
return segment.Partition() == partitionID
|
||||
})
|
||||
}
|
||||
|
||||
func WithChannel(channel string) SegmentFilter {
|
||||
ac, err := metautil.ParseChannel(channel, channelMapper)
|
||||
if err != nil {
|
||||
return SegmentFilterFunc(func(segment Segment) bool {
|
||||
return false
|
||||
})
|
||||
}
|
||||
return SegmentFilterFunc(func(segment Segment) bool {
|
||||
return segment.Shard().Equal(ac)
|
||||
})
|
||||
}
|
||||
|
||||
func WithType(typ SegmentType) SegmentFilter {
|
||||
return SegmentTypeFilter(typ)
|
||||
}
|
||||
|
||||
func WithID(id int64) SegmentFilter {
|
||||
return SegmentIDFilter(id)
|
||||
}
|
||||
|
||||
func WithIDs(ids ...int64) SegmentFilter {
|
||||
return SegmentIDsFilter{
|
||||
segmentIDs: typeutil.NewSet(ids...),
|
||||
}
|
||||
}
|
||||
|
||||
func WithLevel(level datapb.SegmentLevel) SegmentFilter {
|
||||
return SegmentFilterFunc(func(segment Segment) bool {
|
||||
return segment.Level() == level
|
||||
})
|
||||
}
|
|
@ -117,6 +117,10 @@ func (c Channel) EqualString(str string) bool {
|
|||
return c.Equal(ac)
|
||||
}
|
||||
|
||||
func (c Channel) IsZero() bool {
|
||||
return c.ChannelMapper == nil
|
||||
}
|
||||
|
||||
func ParseChannel(virtualName string, mapper ChannelMapper) (Channel, error) {
|
||||
if !channelNameFormat.MatchString(virtualName) {
|
||||
return Channel{}, merr.WrapErrParameterInvalidMsg("virtual channel name(%s) is not valid", virtualName)
|
||||
|
|
Loading…
Reference in New Issue