diff --git a/internal/querynodev2/segments/disk_usage_fetcher.go b/internal/querynodev2/segments/disk_usage_fetcher.go new file mode 100644 index 0000000000..ca4aeecaee --- /dev/null +++ b/internal/querynodev2/segments/disk_usage_fetcher.go @@ -0,0 +1,81 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package segments + +import ( + "context" + "fmt" + "time" + + "go.uber.org/atomic" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/util/segcore" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +type diskUsageFetcher struct { + ctx context.Context + path string + usage *atomic.Int64 + err *atomic.Error +} + +func NewDiskUsageFetcher(ctx context.Context) *diskUsageFetcher { + return &diskUsageFetcher{ + ctx: ctx, + path: paramtable.Get().LocalStorageCfg.Path.GetValue(), + usage: atomic.NewInt64(0), + err: atomic.NewError(nil), + } +} + +func (d *diskUsageFetcher) GetDiskUsage() (int64, error) { + return d.usage.Load(), d.err.Load() +} + +func (d *diskUsageFetcher) fetch() { + diskUsage, err := segcore.GetLocalUsedSize(d.path) + if err != nil { + log.Warn("failed to get disk usage", zap.Error(err)) + d.err.Store(err) + return + } + d.usage.Store(diskUsage) + d.err.Store(nil) + metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(diskUsage) / 1024 / 1024) // in MB + log.Ctx(d.ctx).WithRateGroup("diskUsageFetcher", 1, 300). + RatedInfo(300, "querynode disk usage", zap.Int64("size", diskUsage), zap.Int64("nodeID", paramtable.GetNodeID())) +} + +func (d *diskUsageFetcher) Start() { + d.fetch() // Synchronously fetch once before starting. + + interval := paramtable.Get().QueryNodeCfg.DiskSizeFetchInterval.GetAsDuration(time.Second) + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-d.ctx.Done(): + return + case <-ticker.C: + d.fetch() + } + } +} diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 5b05824adb..802643d958 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -1294,11 +1294,6 @@ func (s *LocalSegment) Release(ctx context.Context, opts ...releaseOption) { GetDynamicPool().Submit(func() (any, error) { C.DeleteSegment(ptr) - localDiskUsage, err := segcore.GetLocalUsedSize(context.Background(), paramtable.Get().LocalStorageCfg.Path.GetValue()) - // ignore error here, shall not block releasing - if err == nil { - metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(localDiskUsage) / 1024 / 1024) // in MB - } return nil, nil }).Await() diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index fc44d7db6c..7c8c8ab5e8 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -44,7 +44,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/querynodev2/pkoracle" "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/internal/util/segcore" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" @@ -167,6 +166,8 @@ func NewLoader( } log.Info("SegmentLoader created", zap.Int("ioPoolSize", ioPoolSize)) + duf := NewDiskUsageFetcher(ctx) + go duf.Start() warmupDispatcher := NewWarmupDispatcher() go warmupDispatcher.Run(ctx) @@ -175,6 +176,7 @@ func NewLoader( cm: cm, loadingSegments: typeutil.NewConcurrentMap[int64, *loadResult](), committedResourceNotifier: syncutil.NewVersionedNotifier(), + duf: duf, warmupDispatcher: warmupDispatcher, } @@ -211,12 +213,14 @@ type segmentLoader struct { manager *Manager cm storage.ChunkManager - mut sync.Mutex // The channel will be closed as the segment loaded - loadingSegments *typeutil.ConcurrentMap[int64, *loadResult] + loadingSegments *typeutil.ConcurrentMap[int64, *loadResult] + + mut sync.Mutex // guards committedResource committedResource LoadResource committedResourceNotifier *syncutil.VersionedNotifier + duf *diskUsageFetcher warmupDispatcher *AsyncWarmupDispatcher } @@ -390,8 +394,6 @@ func (loader *segmentLoader) prepare(ctx context.Context, segmentType SegmentTyp log := log.Ctx(ctx).With( zap.Stringer("segmentType", segmentType), ) - loader.mut.Lock() - defer loader.mut.Unlock() // filter out loaded & loading segments infos := make([]*querypb.SegmentLoadInfo, 0, len(segments)) @@ -414,8 +416,6 @@ func (loader *segmentLoader) prepare(ctx context.Context, segmentType SegmentTyp } func (loader *segmentLoader) unregister(segments ...*querypb.SegmentLoadInfo) { - loader.mut.Lock() - defer loader.mut.Unlock() for i := range segments { result, ok := loader.loadingSegments.GetAndRemove(segments[i].GetSegmentID()) if ok { @@ -450,6 +450,15 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer zap.Int64s("segmentIDs", segmentIDs), ) + memoryUsage := hardware.GetUsedMemoryCount() + totalMemory := hardware.GetMemoryCount() + + diskUsage, err := loader.duf.GetDiskUsage() + if err != nil { + return requestResourceResult{}, errors.Wrap(err, "get local used size failed") + } + diskCap := paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsUint64() + loader.mut.Lock() defer loader.mut.Unlock() @@ -457,15 +466,6 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer CommittedResource: loader.committedResource, } - memoryUsage := hardware.GetUsedMemoryCount() - totalMemory := hardware.GetMemoryCount() - - diskUsage, err := segcore.GetLocalUsedSize(ctx, paramtable.Get().LocalStorageCfg.Path.GetValue()) - if err != nil { - return result, errors.Wrap(err, "get local used size failed") - } - diskCap := paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsUint64() - if loader.committedResource.MemorySize+memoryUsage >= totalMemory { return result, merr.WrapErrServiceMemoryLimitExceeded(float32(loader.committedResource.MemorySize+memoryUsage), float32(totalMemory)) } else if loader.committedResource.DiskSize+uint64(diskUsage) >= diskCap { @@ -473,7 +473,7 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer } result.ConcurrencyLevel = funcutil.Min(hardware.GetCPUNum(), len(infos)) - mu, du, err := loader.checkSegmentSize(ctx, infos) + mu, du, err := loader.checkSegmentSize(ctx, infos, memoryUsage, totalMemory, diskUsage) if err != nil { log.Warn("no sufficient resource to load segments", zap.Error(err)) return result, err @@ -1354,7 +1354,7 @@ func JoinIDPath(ids ...int64) string { // checkSegmentSize checks whether the memory & disk is sufficient to load the segments // returns the memory & disk usage while loading if possible to load, // otherwise, returns error -func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadInfos []*querypb.SegmentLoadInfo) (uint64, uint64, error) { +func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadInfos []*querypb.SegmentLoadInfo, memUsage, totalMem uint64, localDiskUsage int64) (uint64, uint64, error) { if len(segmentLoadInfos) == 0 { return 0, 0, nil } @@ -1367,18 +1367,11 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn return float64(mem) / 1024 / 1024 } - memUsage := hardware.GetUsedMemoryCount() + loader.committedResource.MemorySize - totalMem := hardware.GetMemoryCount() + memUsage = memUsage + loader.committedResource.MemorySize if memUsage == 0 || totalMem == 0 { return 0, 0, errors.New("get memory failed when checkSegmentSize") } - localDiskUsage, err := segcore.GetLocalUsedSize(ctx, paramtable.Get().LocalStorageCfg.Path.GetValue()) - if err != nil { - return 0, 0, errors.Wrap(err, "get local used size failed") - } - - metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(toMB(uint64(localDiskUsage))) diskUsage := uint64(localDiskUsage) + loader.committedResource.DiskSize factor := resourceEstimateFactor{ diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index 0a2da16112..55ff9abc3a 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -327,7 +327,7 @@ func (node *QueryNode) Init() error { node.factory.Init(paramtable.Get()) localRootPath := paramtable.Get().LocalStorageCfg.Path.GetValue() - localUsedSize, err := segcore.GetLocalUsedSize(node.ctx, localRootPath) + localUsedSize, err := segcore.GetLocalUsedSize(localRootPath) if err != nil { log.Warn("get local used size failed", zap.Error(err)) initError = err diff --git a/internal/util/segcore/cgo_util.go b/internal/util/segcore/cgo_util.go index c7cff7d847..9a4298b38e 100644 --- a/internal/util/segcore/cgo_util.go +++ b/internal/util/segcore/cgo_util.go @@ -28,7 +28,6 @@ package segcore import "C" import ( - "context" "math" "reflect" "unsafe" @@ -73,7 +72,7 @@ func getCProtoBlob(cProto *C.CProto) []byte { } // GetLocalUsedSize returns the used size of the local path -func GetLocalUsedSize(ctx context.Context, path string) (int64, error) { +func GetLocalUsedSize(path string) (int64, error) { var availableSize int64 cSize := (*C.int64_t)(&availableSize) cPath := C.CString(path) diff --git a/internal/util/segcore/cgo_util_test.go b/internal/util/segcore/cgo_util_test.go index e2b8e997ab..e5486aa4ca 100644 --- a/internal/util/segcore/cgo_util_test.go +++ b/internal/util/segcore/cgo_util_test.go @@ -1,7 +1,6 @@ package segcore import ( - "context" "runtime" "testing" @@ -17,7 +16,7 @@ func TestConsumeCStatusIntoError(t *testing.T) { } func TestGetLocalUsedSize(t *testing.T) { - size, err := GetLocalUsedSize(context.Background(), "") + size, err := GetLocalUsedSize("") assert.NoError(t, err) assert.NotNil(t, size) } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 2ad93b1f21..7d9c329b49 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2528,6 +2528,7 @@ type queryNodeConfig struct { // loader IoPoolSize ParamItem `refreshable:"false"` DeltaDataExpansionRate ParamItem `refreshable:"true"` + DiskSizeFetchInterval ParamItem `refreshable:"false"` // schedule task policy. SchedulePolicyName ParamItem `refreshable:"false"` @@ -3147,6 +3148,14 @@ Max read concurrency must greater than or equal to 1, and less than or equal to } p.DeltaDataExpansionRate.Init(base.mgr) + p.DiskSizeFetchInterval = ParamItem{ + Key: "querynode.diskSizeFetchInterval", + Version: "2.5.0", + DefaultValue: "60", + Doc: "The time interval in seconds for retrieving disk usage.", + } + p.DiskSizeFetchInterval.Init(base.mgr) + // schedule read task policy. p.SchedulePolicyName = ParamItem{ Key: "queryNode.scheduler.scheduleReadPolicy.name", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 5a026f7b6d..7f1b5c0e25 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -485,6 +485,7 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, "/var/lib/milvus/data/mmap", Params.MmapDirPath.GetValue()) assert.Equal(t, true, Params.MmapChunkCache.GetAsBool()) + assert.Equal(t, 60*time.Second, Params.DiskSizeFetchInterval.GetAsDuration(time.Second)) }) t.Run("test dataCoordConfig", func(t *testing.T) {