mirror of https://github.com/milvus-io/milvus.git
enhance: Optimize GetLocalDiskSize and segment loader mutex (#38599)
1. Make the segment loader lock protect only the resource. 2. Optimize GetDiskUsage to avoid excessive overhead. issue: https://github.com/milvus-io/milvus/issues/37630 --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/38617/head
parent
e501025bba
commit
ce41778fe6
|
@ -0,0 +1,81 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package segments
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/segcore"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
)
|
||||
|
||||
type diskUsageFetcher struct {
|
||||
ctx context.Context
|
||||
path string
|
||||
usage *atomic.Int64
|
||||
err *atomic.Error
|
||||
}
|
||||
|
||||
func NewDiskUsageFetcher(ctx context.Context) *diskUsageFetcher {
|
||||
return &diskUsageFetcher{
|
||||
ctx: ctx,
|
||||
path: paramtable.Get().LocalStorageCfg.Path.GetValue(),
|
||||
usage: atomic.NewInt64(0),
|
||||
err: atomic.NewError(nil),
|
||||
}
|
||||
}
|
||||
|
||||
func (d *diskUsageFetcher) GetDiskUsage() (int64, error) {
|
||||
return d.usage.Load(), d.err.Load()
|
||||
}
|
||||
|
||||
func (d *diskUsageFetcher) fetch() {
|
||||
diskUsage, err := segcore.GetLocalUsedSize(d.path)
|
||||
if err != nil {
|
||||
log.Warn("failed to get disk usage", zap.Error(err))
|
||||
d.err.Store(err)
|
||||
return
|
||||
}
|
||||
d.usage.Store(diskUsage)
|
||||
d.err.Store(nil)
|
||||
metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(diskUsage) / 1024 / 1024) // in MB
|
||||
log.Ctx(d.ctx).WithRateGroup("diskUsageFetcher", 1, 300).
|
||||
RatedInfo(300, "querynode disk usage", zap.Int64("size", diskUsage), zap.Int64("nodeID", paramtable.GetNodeID()))
|
||||
}
|
||||
|
||||
func (d *diskUsageFetcher) Start() {
|
||||
d.fetch() // Synchronously fetch once before starting.
|
||||
|
||||
interval := paramtable.Get().QueryNodeCfg.DiskSizeFetchInterval.GetAsDuration(time.Second)
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-d.ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
d.fetch()
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1294,11 +1294,6 @@ func (s *LocalSegment) Release(ctx context.Context, opts ...releaseOption) {
|
|||
|
||||
GetDynamicPool().Submit(func() (any, error) {
|
||||
C.DeleteSegment(ptr)
|
||||
localDiskUsage, err := segcore.GetLocalUsedSize(context.Background(), paramtable.Get().LocalStorageCfg.Path.GetValue())
|
||||
// ignore error here, shall not block releasing
|
||||
if err == nil {
|
||||
metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(localDiskUsage) / 1024 / 1024) // in MB
|
||||
}
|
||||
return nil, nil
|
||||
}).Await()
|
||||
|
||||
|
|
|
@ -44,7 +44,6 @@ import (
|
|||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/querynodev2/pkoracle"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/internal/util/segcore"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
|
@ -167,6 +166,8 @@ func NewLoader(
|
|||
}
|
||||
|
||||
log.Info("SegmentLoader created", zap.Int("ioPoolSize", ioPoolSize))
|
||||
duf := NewDiskUsageFetcher(ctx)
|
||||
go duf.Start()
|
||||
|
||||
warmupDispatcher := NewWarmupDispatcher()
|
||||
go warmupDispatcher.Run(ctx)
|
||||
|
@ -175,6 +176,7 @@ func NewLoader(
|
|||
cm: cm,
|
||||
loadingSegments: typeutil.NewConcurrentMap[int64, *loadResult](),
|
||||
committedResourceNotifier: syncutil.NewVersionedNotifier(),
|
||||
duf: duf,
|
||||
warmupDispatcher: warmupDispatcher,
|
||||
}
|
||||
|
||||
|
@ -211,12 +213,14 @@ type segmentLoader struct {
|
|||
manager *Manager
|
||||
cm storage.ChunkManager
|
||||
|
||||
mut sync.Mutex
|
||||
// The channel will be closed as the segment loaded
|
||||
loadingSegments *typeutil.ConcurrentMap[int64, *loadResult]
|
||||
loadingSegments *typeutil.ConcurrentMap[int64, *loadResult]
|
||||
|
||||
mut sync.Mutex // guards committedResource
|
||||
committedResource LoadResource
|
||||
committedResourceNotifier *syncutil.VersionedNotifier
|
||||
|
||||
duf *diskUsageFetcher
|
||||
warmupDispatcher *AsyncWarmupDispatcher
|
||||
}
|
||||
|
||||
|
@ -390,8 +394,6 @@ func (loader *segmentLoader) prepare(ctx context.Context, segmentType SegmentTyp
|
|||
log := log.Ctx(ctx).With(
|
||||
zap.Stringer("segmentType", segmentType),
|
||||
)
|
||||
loader.mut.Lock()
|
||||
defer loader.mut.Unlock()
|
||||
|
||||
// filter out loaded & loading segments
|
||||
infos := make([]*querypb.SegmentLoadInfo, 0, len(segments))
|
||||
|
@ -414,8 +416,6 @@ func (loader *segmentLoader) prepare(ctx context.Context, segmentType SegmentTyp
|
|||
}
|
||||
|
||||
func (loader *segmentLoader) unregister(segments ...*querypb.SegmentLoadInfo) {
|
||||
loader.mut.Lock()
|
||||
defer loader.mut.Unlock()
|
||||
for i := range segments {
|
||||
result, ok := loader.loadingSegments.GetAndRemove(segments[i].GetSegmentID())
|
||||
if ok {
|
||||
|
@ -450,6 +450,15 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer
|
|||
zap.Int64s("segmentIDs", segmentIDs),
|
||||
)
|
||||
|
||||
memoryUsage := hardware.GetUsedMemoryCount()
|
||||
totalMemory := hardware.GetMemoryCount()
|
||||
|
||||
diskUsage, err := loader.duf.GetDiskUsage()
|
||||
if err != nil {
|
||||
return requestResourceResult{}, errors.Wrap(err, "get local used size failed")
|
||||
}
|
||||
diskCap := paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsUint64()
|
||||
|
||||
loader.mut.Lock()
|
||||
defer loader.mut.Unlock()
|
||||
|
||||
|
@ -457,15 +466,6 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer
|
|||
CommittedResource: loader.committedResource,
|
||||
}
|
||||
|
||||
memoryUsage := hardware.GetUsedMemoryCount()
|
||||
totalMemory := hardware.GetMemoryCount()
|
||||
|
||||
diskUsage, err := segcore.GetLocalUsedSize(ctx, paramtable.Get().LocalStorageCfg.Path.GetValue())
|
||||
if err != nil {
|
||||
return result, errors.Wrap(err, "get local used size failed")
|
||||
}
|
||||
diskCap := paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsUint64()
|
||||
|
||||
if loader.committedResource.MemorySize+memoryUsage >= totalMemory {
|
||||
return result, merr.WrapErrServiceMemoryLimitExceeded(float32(loader.committedResource.MemorySize+memoryUsage), float32(totalMemory))
|
||||
} else if loader.committedResource.DiskSize+uint64(diskUsage) >= diskCap {
|
||||
|
@ -473,7 +473,7 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer
|
|||
}
|
||||
|
||||
result.ConcurrencyLevel = funcutil.Min(hardware.GetCPUNum(), len(infos))
|
||||
mu, du, err := loader.checkSegmentSize(ctx, infos)
|
||||
mu, du, err := loader.checkSegmentSize(ctx, infos, memoryUsage, totalMemory, diskUsage)
|
||||
if err != nil {
|
||||
log.Warn("no sufficient resource to load segments", zap.Error(err))
|
||||
return result, err
|
||||
|
@ -1354,7 +1354,7 @@ func JoinIDPath(ids ...int64) string {
|
|||
// checkSegmentSize checks whether the memory & disk is sufficient to load the segments
|
||||
// returns the memory & disk usage while loading if possible to load,
|
||||
// otherwise, returns error
|
||||
func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadInfos []*querypb.SegmentLoadInfo) (uint64, uint64, error) {
|
||||
func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadInfos []*querypb.SegmentLoadInfo, memUsage, totalMem uint64, localDiskUsage int64) (uint64, uint64, error) {
|
||||
if len(segmentLoadInfos) == 0 {
|
||||
return 0, 0, nil
|
||||
}
|
||||
|
@ -1367,18 +1367,11 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn
|
|||
return float64(mem) / 1024 / 1024
|
||||
}
|
||||
|
||||
memUsage := hardware.GetUsedMemoryCount() + loader.committedResource.MemorySize
|
||||
totalMem := hardware.GetMemoryCount()
|
||||
memUsage = memUsage + loader.committedResource.MemorySize
|
||||
if memUsage == 0 || totalMem == 0 {
|
||||
return 0, 0, errors.New("get memory failed when checkSegmentSize")
|
||||
}
|
||||
|
||||
localDiskUsage, err := segcore.GetLocalUsedSize(ctx, paramtable.Get().LocalStorageCfg.Path.GetValue())
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrap(err, "get local used size failed")
|
||||
}
|
||||
|
||||
metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(toMB(uint64(localDiskUsage)))
|
||||
diskUsage := uint64(localDiskUsage) + loader.committedResource.DiskSize
|
||||
|
||||
factor := resourceEstimateFactor{
|
||||
|
|
|
@ -327,7 +327,7 @@ func (node *QueryNode) Init() error {
|
|||
node.factory.Init(paramtable.Get())
|
||||
|
||||
localRootPath := paramtable.Get().LocalStorageCfg.Path.GetValue()
|
||||
localUsedSize, err := segcore.GetLocalUsedSize(node.ctx, localRootPath)
|
||||
localUsedSize, err := segcore.GetLocalUsedSize(localRootPath)
|
||||
if err != nil {
|
||||
log.Warn("get local used size failed", zap.Error(err))
|
||||
initError = err
|
||||
|
|
|
@ -28,7 +28,6 @@ package segcore
|
|||
import "C"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"reflect"
|
||||
"unsafe"
|
||||
|
@ -73,7 +72,7 @@ func getCProtoBlob(cProto *C.CProto) []byte {
|
|||
}
|
||||
|
||||
// GetLocalUsedSize returns the used size of the local path
|
||||
func GetLocalUsedSize(ctx context.Context, path string) (int64, error) {
|
||||
func GetLocalUsedSize(path string) (int64, error) {
|
||||
var availableSize int64
|
||||
cSize := (*C.int64_t)(&availableSize)
|
||||
cPath := C.CString(path)
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package segcore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"runtime"
|
||||
"testing"
|
||||
|
||||
|
@ -17,7 +16,7 @@ func TestConsumeCStatusIntoError(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestGetLocalUsedSize(t *testing.T) {
|
||||
size, err := GetLocalUsedSize(context.Background(), "")
|
||||
size, err := GetLocalUsedSize("")
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, size)
|
||||
}
|
||||
|
|
|
@ -2528,6 +2528,7 @@ type queryNodeConfig struct {
|
|||
// loader
|
||||
IoPoolSize ParamItem `refreshable:"false"`
|
||||
DeltaDataExpansionRate ParamItem `refreshable:"true"`
|
||||
DiskSizeFetchInterval ParamItem `refreshable:"false"`
|
||||
|
||||
// schedule task policy.
|
||||
SchedulePolicyName ParamItem `refreshable:"false"`
|
||||
|
@ -3147,6 +3148,14 @@ Max read concurrency must greater than or equal to 1, and less than or equal to
|
|||
}
|
||||
p.DeltaDataExpansionRate.Init(base.mgr)
|
||||
|
||||
p.DiskSizeFetchInterval = ParamItem{
|
||||
Key: "querynode.diskSizeFetchInterval",
|
||||
Version: "2.5.0",
|
||||
DefaultValue: "60",
|
||||
Doc: "The time interval in seconds for retrieving disk usage.",
|
||||
}
|
||||
p.DiskSizeFetchInterval.Init(base.mgr)
|
||||
|
||||
// schedule read task policy.
|
||||
p.SchedulePolicyName = ParamItem{
|
||||
Key: "queryNode.scheduler.scheduleReadPolicy.name",
|
||||
|
|
|
@ -485,6 +485,7 @@ func TestComponentParam(t *testing.T) {
|
|||
assert.Equal(t, "/var/lib/milvus/data/mmap", Params.MmapDirPath.GetValue())
|
||||
|
||||
assert.Equal(t, true, Params.MmapChunkCache.GetAsBool())
|
||||
assert.Equal(t, 60*time.Second, Params.DiskSizeFetchInterval.GetAsDuration(time.Second))
|
||||
})
|
||||
|
||||
t.Run("test dataCoordConfig", func(t *testing.T) {
|
||||
|
|
Loading…
Reference in New Issue