fix: lru related issue fixup patch (#32916)

issue: #32206, #32801

- search failure with some assertion, segment not loaded and resource
insufficient.

- segment leak when query segments

---------

Signed-off-by: chyezh <chyezh@outlook.com>
pull/32961/head
chyezh 2024-05-10 19:17:30 +08:00 committed by GitHub
parent 25689859a1
commit 1c84a1c9b6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 210 additions and 134 deletions

View File

@ -49,7 +49,6 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
@ -1763,28 +1762,14 @@ func (suite *ServiceSuite) TestHandleNodeUp() {
}))
server.handleNodeUp(111)
// wait for async update by observer
time.Sleep(100 * time.Millisecond)
suite.Eventually(func() bool {
nodes := suite.server.meta.ReplicaManager.Get(1).GetNodes()
nodesInRG, _ := suite.server.meta.ResourceManager.GetNodes(meta.DefaultResourceGroupName)
return len(nodes) == len(nodesInRG)
}, 5*time.Second, 100*time.Millisecond)
nodes := suite.server.meta.ReplicaManager.Get(1).GetNodes()
nodesInRG, _ := suite.server.meta.ResourceManager.GetNodes(meta.DefaultResourceGroupName)
suite.ElementsMatch(nodes, nodesInRG)
log.Info("handleNodeUp")
// when more rg exist, new node shouldn't be assign to replica in default rg in handleNodeUp
suite.server.meta.ResourceManager.AddResourceGroup("rg", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{NodeNum: 1},
Limits: &rgpb.ResourceGroupLimit{NodeNum: 1},
})
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 222,
Address: "localhost",
Hostname: "localhost",
}))
server.handleNodeUp(222)
// wait for async update by observer
time.Sleep(100 * time.Millisecond)
nodes = suite.server.meta.ReplicaManager.Get(1).GetNodes()
nodesInRG, _ = suite.server.meta.ResourceManager.GetNodes(meta.DefaultResourceGroupName)
suite.ElementsMatch(nodes, nodesInRG)
}
func (suite *ServiceSuite) loadAll() {

View File

@ -241,7 +241,10 @@ func NewManager() *Manager {
segMgr.registerReleaseCallback(func(s Segment) {
if s.Type() == SegmentTypeSealed {
manager.DiskCache.Expire(context.Background(), s.ID())
// !!! We cannot use ctx of request to call Remove,
// Once context canceled, the segment will be leak in cache forever.
// Because it has been cleaned from segment manager.
manager.DiskCache.Remove(context.Background(), s.ID())
}
})
@ -417,6 +420,7 @@ func (mgr *segmentManager) GetAndPinBy(filters ...SegmentFilter) ([]Segment, err
for _, segment := range ret {
segment.Unpin()
}
ret = nil
}
}()
@ -432,7 +436,7 @@ func (mgr *segmentManager) GetAndPinBy(filters ...SegmentFilter) ([]Segment, err
return true
}, filters...)
return ret, nil
return ret, err
}
func (mgr *segmentManager) GetAndPin(segments []int64, filters ...SegmentFilter) ([]Segment, error) {
@ -446,6 +450,7 @@ func (mgr *segmentManager) GetAndPin(segments []int64, filters ...SegmentFilter)
for _, segment := range lockedSegments {
segment.Unpin()
}
lockedSegments = nil
}
}()
@ -729,10 +734,11 @@ func (mgr *segmentManager) updateMetric() {
}
func (mgr *segmentManager) remove(ctx context.Context, segment Segment) bool {
segment.Release(ctx)
if mgr.releaseCallback != nil {
mgr.releaseCallback(segment)
log.Ctx(ctx).Info("remove segment from cache", zap.Int64("segmentID", segment.ID()))
}
segment.Release(ctx)
metrics.QueryNodeNumSegments.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),

View File

@ -38,14 +38,15 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type RetrieveSegmentResult struct {
Result *segcorepb.RetrieveResults
Segment Segment
}
// retrieveOnSegments performs retrieve on listed segments
// all segment ids are validated before calling this function
func retrieveOnSegments(ctx context.Context, mgr *Manager, segments []Segment, segType SegmentType, plan *RetrievePlan, req *querypb.QueryRequest) ([]*segcorepb.RetrieveResults, []Segment, error) {
type segmentResult struct {
result *segcorepb.RetrieveResults
segment Segment
}
resultCh := make(chan segmentResult, len(segments))
func retrieveOnSegments(ctx context.Context, mgr *Manager, segments []Segment, segType SegmentType, plan *RetrievePlan, req *querypb.QueryRequest) ([]RetrieveSegmentResult, error) {
resultCh := make(chan RetrieveSegmentResult, len(segments))
// TODO(longjiquan): remove this limit after two-phase retrieval can be applied on lru-segment.
plan.ignoreNonPk = !paramtable.Get().QueryNodeCfg.UseStreamComputing.GetAsBool() &&
@ -62,7 +63,7 @@ func retrieveOnSegments(ctx context.Context, mgr *Manager, segments []Segment, s
if err != nil {
return err
}
resultCh <- segmentResult{
resultCh <- RetrieveSegmentResult{
result,
s,
}
@ -107,17 +108,14 @@ func retrieveOnSegments(ctx context.Context, mgr *Manager, segments []Segment, s
close(resultCh)
if err != nil {
return nil, nil, err
return nil, err
}
var retrieveSegments []Segment
var retrieveResults []*segcorepb.RetrieveResults
for result := range resultCh {
retrieveSegments = append(retrieveSegments, result.segment)
retrieveResults = append(retrieveResults, result.result)
results := make([]RetrieveSegmentResult, 0, len(segments))
for r := range resultCh {
results = append(results, r)
}
return retrieveResults, retrieveSegments, nil
return results, nil
}
func retrieveOnSegmentsWithStream(ctx context.Context, segments []Segment, segType SegmentType, plan *RetrievePlan, svr streamrpc.QueryStreamServer) error {
@ -166,7 +164,7 @@ func retrieveOnSegmentsWithStream(ctx context.Context, segments []Segment, segTy
}
// retrieve will retrieve all the validate target segments
func Retrieve(ctx context.Context, manager *Manager, plan *RetrievePlan, req *querypb.QueryRequest) ([]*segcorepb.RetrieveResults, []Segment, error) {
func Retrieve(ctx context.Context, manager *Manager, plan *RetrievePlan, req *querypb.QueryRequest) ([]RetrieveSegmentResult, []Segment, error) {
if ctx.Err() != nil {
return nil, nil, ctx.Err()
}
@ -188,10 +186,11 @@ func Retrieve(ctx context.Context, manager *Manager, plan *RetrievePlan, req *qu
}
if err != nil {
return nil, nil, err
return nil, retrieveSegments, err
}
return retrieveOnSegments(ctx, manager, retrieveSegments, SegType, plan, req)
result, err := retrieveOnSegments(ctx, manager, retrieveSegments, SegType, plan, req)
return result, retrieveSegments, err
}
// retrieveStreaming will retrieve all the validate target segments and return by stream

View File

@ -161,7 +161,7 @@ func (suite *RetrieveSuite) TestRetrieveSealed() {
res, segments, err := Retrieve(context.TODO(), suite.manager, plan, req)
suite.NoError(err)
suite.Len(res[0].Offset, 3)
suite.Len(res[0].Result.Offset, 3)
suite.manager.Segment.Unpin(segments)
}
@ -180,7 +180,7 @@ func (suite *RetrieveSuite) TestRetrieveGrowing() {
res, segments, err := Retrieve(context.TODO(), suite.manager, plan, req)
suite.NoError(err)
suite.Len(res[0].Offset, 3)
suite.Len(res[0].Result.Offset, 3)
suite.manager.Segment.Unpin(segments)
}

View File

@ -59,6 +59,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -67,6 +68,8 @@ const (
UsedDiskMemoryRatio = 4
)
var errRetryTimerNotified = errors.New("retry timer notified")
type Loader interface {
// Load loads binlogs, and spawn segments,
// NOTE: make sure the ref count of the corresponding collection will never go down to 0 during this
@ -91,6 +94,12 @@ type Loader interface {
) error
}
type requestResourceResult struct {
Resource LoadResource
CommittedResource LoadResource
ConcurrencyLevel int
}
type LoadResource struct {
MemorySize uint64
DiskSize uint64
@ -106,6 +115,10 @@ func (r *LoadResource) Sub(resource LoadResource) {
r.DiskSize -= resource.DiskSize
}
func (r *LoadResource) IsZero() bool {
return r.MemorySize == 0 && r.DiskSize == 0
}
type resourceEstimateFactor struct {
memoryUsageFactor float64
memoryIndexUsageFactor float64
@ -165,12 +178,12 @@ func (loader *segmentLoaderV2) Load(ctx context.Context,
log.Info("start loading...", zap.Int("segmentNum", len(segments)), zap.Int("afterFilter", len(infos)))
// Check memory & storage limit
resource, concurrencyLevel, err := loader.requestResource(ctx, infos...)
requestResourceResult, err := loader.requestResource(ctx, infos...)
if err != nil {
log.Warn("request resource failed", zap.Error(err))
return nil, err
}
defer loader.freeRequest(resource)
defer loader.freeRequest(requestResourceResult.Resource)
newSegments := typeutil.NewConcurrentMap[int64, Segment]()
loaded := typeutil.NewConcurrentMap[int64, Segment]()
@ -243,9 +256,9 @@ func (loader *segmentLoaderV2) Load(ctx context.Context,
// Make sure we can always benefit from concurrency, and not spawn too many idle goroutines
log.Info("start to load segments in parallel",
zap.Int("segmentNum", len(infos)),
zap.Int("concurrencyLevel", concurrencyLevel))
zap.Int("concurrencyLevel", requestResourceResult.ConcurrencyLevel))
err = funcutil.ProcessFuncParallel(len(infos),
concurrencyLevel, loadSegmentFunc, "loadSegmentFunc")
requestResourceResult.ConcurrencyLevel, loadSegmentFunc, "loadSegmentFunc")
if err != nil {
log.Warn("failed to load some segments", zap.Error(err))
return nil, err
@ -535,9 +548,10 @@ func NewLoader(
log.Info("SegmentLoader created", zap.Int("ioPoolSize", ioPoolSize))
loader := &segmentLoader{
manager: manager,
cm: cm,
loadingSegments: typeutil.NewConcurrentMap[int64, *loadResult](),
manager: manager,
cm: cm,
loadingSegments: typeutil.NewConcurrentMap[int64, *loadResult](),
committedResourceNotifier: syncutil.NewVersionedNotifier(),
}
return loader
@ -575,8 +589,9 @@ type segmentLoader struct {
mut sync.Mutex
// The channel will be closed as the segment loaded
loadingSegments *typeutil.ConcurrentMap[int64, *loadResult]
committedResource LoadResource
loadingSegments *typeutil.ConcurrentMap[int64, *loadResult]
committedResource LoadResource
committedResourceNotifier *syncutil.VersionedNotifier
}
var _ Loader = (*segmentLoader)(nil)
@ -609,18 +624,17 @@ func (loader *segmentLoader) Load(ctx context.Context,
log.Info("start loading...", zap.Int("segmentNum", len(segments)), zap.Int("afterFilter", len(infos)))
var err error
var resource LoadResource
var concurrencyLevel int
var requestResourceResult requestResourceResult
coll := loader.manager.Collection.Get(collectionID)
if !isLazyLoad(coll, segmentType) {
// Check memory & storage limit
// no need to check resource for lazy load here
resource, concurrencyLevel, err = loader.requestResource(ctx, infos...)
requestResourceResult, err = loader.requestResource(ctx, infos...)
if err != nil {
log.Warn("request resource failed", zap.Error(err))
return nil, err
}
defer loader.freeRequest(resource)
defer loader.freeRequest(requestResourceResult.Resource)
}
newSegments := typeutil.NewConcurrentMap[int64, Segment]()
loaded := typeutil.NewConcurrentMap[int64, Segment]()
@ -712,10 +726,10 @@ func (loader *segmentLoader) Load(ctx context.Context,
// Make sure we can always benefit from concurrency, and not spawn too many idle goroutines
log.Info("start to load segments in parallel",
zap.Int("segmentNum", len(infos)),
zap.Int("concurrencyLevel", concurrencyLevel))
zap.Int("concurrencyLevel", requestResourceResult.ConcurrencyLevel))
err = funcutil.ProcessFuncParallel(len(infos),
concurrencyLevel, loadSegmentFunc, "loadSegmentFunc")
requestResourceResult.ConcurrencyLevel, loadSegmentFunc, "loadSegmentFunc")
if err != nil {
log.Warn("failed to load some segments", zap.Error(err))
return nil, err
@ -786,13 +800,12 @@ func (loader *segmentLoader) notifyLoadFinish(segments ...*querypb.SegmentLoadIn
// requestResource requests memory & storage to load segments,
// returns the memory usage, disk usage and concurrency with the gained memory.
func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*querypb.SegmentLoadInfo) (LoadResource, int, error) {
resource := LoadResource{}
func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*querypb.SegmentLoadInfo) (requestResourceResult, error) {
// we need to deal with empty infos case separately,
// because the following judgement for requested resources are based on current status and static config
// which may block empty-load operations by accident
if len(infos) == 0 {
return resource, 0, nil
return requestResourceResult{}, nil
}
segmentIDs := lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) int64 {
@ -805,43 +818,47 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer
loader.mut.Lock()
defer loader.mut.Unlock()
result := requestResourceResult{
CommittedResource: loader.committedResource,
}
memoryUsage := hardware.GetUsedMemoryCount()
totalMemory := hardware.GetMemoryCount()
diskUsage, err := GetLocalUsedSize(ctx, paramtable.Get().LocalStorageCfg.Path.GetValue())
if err != nil {
return resource, 0, errors.Wrap(err, "get local used size failed")
return result, errors.Wrap(err, "get local used size failed")
}
diskCap := paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsUint64()
if loader.committedResource.MemorySize+memoryUsage >= totalMemory {
return resource, 0, merr.WrapErrServiceMemoryLimitExceeded(float32(loader.committedResource.MemorySize+memoryUsage), float32(totalMemory))
return result, merr.WrapErrServiceMemoryLimitExceeded(float32(loader.committedResource.MemorySize+memoryUsage), float32(totalMemory))
} else if loader.committedResource.DiskSize+uint64(diskUsage) >= diskCap {
return resource, 0, merr.WrapErrServiceDiskLimitExceeded(float32(loader.committedResource.DiskSize+uint64(diskUsage)), float32(diskCap))
return result, merr.WrapErrServiceDiskLimitExceeded(float32(loader.committedResource.DiskSize+uint64(diskUsage)), float32(diskCap))
}
concurrencyLevel := funcutil.Min(hardware.GetCPUNum(), len(infos))
result.ConcurrencyLevel = funcutil.Min(hardware.GetCPUNum(), len(infos))
mu, du, err := loader.checkSegmentSize(ctx, infos)
if err != nil {
log.Warn("no sufficient resource to load segments", zap.Error(err))
return resource, 0, err
return result, err
}
resource.MemorySize += mu
resource.DiskSize += du
result.Resource.MemorySize += mu
result.Resource.DiskSize += du
toMB := func(mem uint64) float64 {
return float64(mem) / 1024 / 1024
}
loader.committedResource.Add(resource)
loader.committedResource.Add(result.Resource)
log.Info("request resource for loading segments (unit in MiB)",
zap.Float64("memory", toMB(resource.MemorySize)),
zap.Float64("memory", toMB(result.Resource.MemorySize)),
zap.Float64("committedMemory", toMB(loader.committedResource.MemorySize)),
zap.Float64("disk", toMB(resource.DiskSize)),
zap.Float64("disk", toMB(result.Resource.DiskSize)),
zap.Float64("committedDisk", toMB(loader.committedResource.DiskSize)),
)
return resource, concurrencyLevel, nil
return result, nil
}
// freeRequest returns request memory & storage usage request.
@ -850,6 +867,7 @@ func (loader *segmentLoader) freeRequest(resource LoadResource) {
defer loader.mut.Unlock()
loader.committedResource.Sub(resource)
loader.committedResourceNotifier.NotifyAll()
}
func (loader *segmentLoader) waitSegmentLoadDone(ctx context.Context, segmentType SegmentType, segmentIDs []int64, version int64) error {
@ -990,7 +1008,7 @@ func separateIndexAndBinlog(loadInfo *querypb.SegmentLoadInfo) (map[int64]*Index
return indexedFieldInfos, fieldBinlogs
}
func (loader *segmentLoader) loadSealedSegment(ctx context.Context, loadInfo *querypb.SegmentLoadInfo, segment *LocalSegment) error {
func (loader *segmentLoader) loadSealedSegment(ctx context.Context, loadInfo *querypb.SegmentLoadInfo, segment *LocalSegment) (err error) {
// TODO: we should create a transaction-like api to load segment for segment interface,
// but not do many things in segment loader.
stateLockGuard, err := segment.StartLoadData()
@ -1120,7 +1138,7 @@ func (loader *segmentLoader) LoadLazySegment(ctx context.Context,
segment *LocalSegment,
loadInfo *querypb.SegmentLoadInfo,
) (err error) {
resource, _, err := loader.requestResourceWithTimeout(ctx, loadInfo)
resource, err := loader.requestResourceWithTimeout(ctx, loadInfo)
if err != nil {
log.Ctx(ctx).Warn("request resource failed", zap.Error(err))
return err
@ -1131,23 +1149,36 @@ func (loader *segmentLoader) LoadLazySegment(ctx context.Context,
}
// requestResourceWithTimeout requests memory & storage to load segments with a timeout and retry.
func (loader *segmentLoader) requestResourceWithTimeout(ctx context.Context, infos ...*querypb.SegmentLoadInfo) (LoadResource, int, error) {
timeout := paramtable.Get().QueryNodeCfg.LazyLoadRequestResourceTimeout.GetAsDuration(time.Millisecond)
func (loader *segmentLoader) requestResourceWithTimeout(ctx context.Context, infos ...*querypb.SegmentLoadInfo) (LoadResource, error) {
retryInterval := paramtable.Get().QueryNodeCfg.LazyLoadRequestResourceRetryInterval.GetAsDuration(time.Millisecond)
// TODO: use context.WithTimeoutCause instead of contextutil.WithTimeoutCause in go1.21
ctx, cancel := contextutil.WithTimeoutCause(ctx, timeout, merr.ErrServiceResourceInsufficient)
defer cancel()
timeoutStarted := false
for {
resource, concurrencyLevel, err := loader.requestResource(ctx, infos...)
listener := loader.committedResourceNotifier.Listen(syncutil.VersionedListenAtLatest)
result, err := loader.requestResource(ctx, infos...)
if err == nil {
return resource, concurrencyLevel, nil
return result.Resource, nil
}
select {
case <-ctx.Done():
return LoadResource{}, -1, context.Cause(ctx)
case <-time.After(retryInterval):
// start timeout if there's no committed resource in loading.
if !timeoutStarted && result.CommittedResource.IsZero() {
timeout := paramtable.Get().QueryNodeCfg.LazyLoadRequestResourceTimeout.GetAsDuration(time.Millisecond)
var cancel context.CancelFunc
// TODO: use context.WithTimeoutCause instead of contextutil.WithTimeoutCause in go1.21
ctx, cancel = contextutil.WithTimeoutCause(ctx, timeout, merr.ErrServiceResourceInsufficient)
defer cancel()
timeoutStarted = true
}
// TODO: use context.WithTimeoutCause instead of contextutil.WithTimeoutCause in go1.21
ctxWithRetryTimeout, cancelWithRetryTimeout := contextutil.WithTimeoutCause(ctx, retryInterval, errRetryTimerNotified)
err = listener.Wait(ctxWithRetryTimeout)
// if error is not caused by retry timeout, return it directly.
if err != nil && !errors.Is(err, errRetryTimerNotified) {
cancelWithRetryTimeout()
return LoadResource{}, err
}
cancelWithRetryTimeout()
}
}
@ -1655,11 +1686,11 @@ func (loader *segmentLoader) LoadIndex(ctx context.Context, segment *LocalSegmen
info.Statslogs = nil
return info
})
resource, _, err := loader.requestResource(ctx, indexInfo...)
requestResourceResult, err := loader.requestResource(ctx, indexInfo...)
if err != nil {
return err
}
defer loader.freeRequest(resource)
defer loader.freeRequest(requestResourceResult.Resource)
log.Info("segment loader start to load index", zap.Int("segmentNumAfterFilter", len(infos)))
metrics.QueryNodeLoadSegmentConcurrency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), "LoadIndex").Inc()

View File

@ -26,6 +26,7 @@ import (
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/apache/arrow/go/v12/arrow/memory"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"
@ -41,6 +42,7 @@ import (
"github.com/milvus-io/milvus/internal/util/initcore"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/contextutil"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/merr"
@ -847,31 +849,55 @@ func (suite *SegmentLoaderDetailSuite) TestRequestResource() {
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.Key, "0")
defer paramtable.Get().Reset(paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.Key)
_, _, err := suite.loader.requestResource(context.Background())
_, err := suite.loader.requestResource(context.Background())
suite.NoError(err)
})
loadInfo := &querypb.SegmentLoadInfo{
SegmentID: 100,
CollectionID: suite.collectionID,
Level: datapb.SegmentLevel_L0,
Deltalogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{LogSize: 10000},
{LogSize: 12000},
},
},
},
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
}
suite.Run("l0_segment_deltalog", func() {
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.DeltaDataExpansionRate.Key, "50")
defer paramtable.Get().Reset(paramtable.Get().QueryNodeCfg.DeltaDataExpansionRate.Key)
resource, _, err := suite.loader.requestResource(context.Background(), &querypb.SegmentLoadInfo{
SegmentID: 100,
CollectionID: suite.collectionID,
Level: datapb.SegmentLevel_L0,
Deltalogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{LogSize: 10000},
{LogSize: 12000},
},
},
},
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
})
resource, err := suite.loader.requestResource(context.Background(), loadInfo)
suite.NoError(err)
suite.EqualValues(1100000, resource.Resource.MemorySize)
})
suite.Run("request_resource_with_timeout", func() {
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.DeltaDataExpansionRate.Key, "50")
defer paramtable.Get().Reset(paramtable.Get().QueryNodeCfg.DeltaDataExpansionRate.Key)
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.LazyLoadRequestResourceTimeout.Key, "500")
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.LazyLoadRequestResourceRetryInterval.Key, "100")
resource, err := suite.loader.requestResourceWithTimeout(context.Background(), loadInfo)
suite.NoError(err)
suite.EqualValues(1100000, resource.MemorySize)
suite.loader.committedResource.Add(LoadResource{
MemorySize: 1024 * 1024 * 1024 * 1024,
})
timeoutErr := errors.New("timeout")
ctx, cancel := contextutil.WithTimeoutCause(context.Background(), 1000*time.Millisecond, timeoutErr)
defer cancel()
resource, err = suite.loader.requestResourceWithTimeout(ctx, loadInfo)
suite.Error(err)
suite.ErrorIs(err, timeoutErr)
})
}

View File

@ -13,6 +13,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/segcorepb"
"github.com/milvus-io/milvus/internal/querynodev2/collector"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/pkg/metrics"
@ -113,8 +114,8 @@ func (t *QueryTask) Execute() error {
return err
}
defer retrievePlan.Delete()
results, querySegments, err := segments.Retrieve(t.ctx, t.segmentManager, retrievePlan, t.req)
defer t.segmentManager.Segment.Unpin(querySegments)
results, pinnedSegments, err := segments.Retrieve(t.ctx, t.segmentManager, retrievePlan, t.req)
defer t.segmentManager.Segment.Unpin(pinnedSegments)
if err != nil {
return err
}
@ -124,7 +125,14 @@ func (t *QueryTask) Execute() error {
t.collection.Schema(),
)
beforeReduce := time.Now()
reducedResult, err := reducer.Reduce(t.ctx, results, querySegments, retrievePlan)
reduceResults := make([]*segcorepb.RetrieveResults, 0, len(results))
querySegments := make([]segments.Segment, 0, len(results))
for _, result := range results {
reduceResults = append(reduceResults, result.Result)
querySegments = append(querySegments, result.Segment)
}
reducedResult, err := reducer.Reduce(t.ctx, reduceResults, querySegments, retrievePlan)
metrics.QueryNodeReduceLatency.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),

View File

@ -153,10 +153,10 @@ type Cache[K comparable, V any] interface {
MarkItemNeedReload(ctx context.Context, key K) bool
// Expire removes the item from the cache.
// Return true if the item is not in used and removed immediately or the item is not in cache now.
// Return false if the item is in used, it will be marked as need to be reloaded, a lazy expire is applied.
Expire(ctx context.Context, key K) (evicted bool)
// Remove removes the item from the cache.
// Return nil if the item is removed.
// Return error if the Remove operation is canceled.
Remove(ctx context.Context, key K) error
}
// lruCache extends the ccache library to provide pinning and unpinning of items.
@ -435,7 +435,22 @@ func (c *lruCache[K, V]) setAndPin(ctx context.Context, key K, value V) (*cacheI
return item, nil
}
func (c *lruCache[K, V]) Expire(ctx context.Context, key K) (evicted bool) {
func (c *lruCache[K, V]) Remove(ctx context.Context, key K) error {
for {
listener := c.waitNotifier.Listen(syncutil.VersionedListenAtLatest)
if c.tryToRemoveKey(ctx, key) {
return nil
}
if err := listener.Wait(ctx); err != nil {
log.Warn("failed to remove item for key with timeout", zap.Error(err))
return err
}
}
}
func (c *lruCache[K, V]) tryToRemoveKey(ctx context.Context, key K) (removed bool) {
c.rwlock.Lock()
defer c.rwlock.Unlock()
@ -449,8 +464,6 @@ func (c *lruCache[K, V]) Expire(ctx context.Context, key K) (evicted bool) {
c.evict(ctx, key)
return true
}
item.needReload = true
return false
}

View File

@ -122,7 +122,7 @@ func TestLRUCache(t *testing.T) {
return nil
})
assert.True(t, missing)
assert.ErrorIs(t, err, context.Canceled)
assert.ErrorIs(t, err, errTimeout)
assert.ErrorIs(t, context.Cause(ctx), errTimeout)
})
@ -282,7 +282,7 @@ func TestLRUCacheConcurrency(t *testing.T) {
return nil
})
wg.Done()
assert.ErrorIs(t, err, context.Canceled)
assert.ErrorIs(t, err, errTimeout)
assert.ErrorIs(t, context.Cause(ctx), errTimeout)
})
@ -311,7 +311,7 @@ func TestLRUCacheConcurrency(t *testing.T) {
})
wg.Done()
assert.True(t, missing)
assert.ErrorIs(t, err, context.Canceled)
assert.ErrorIs(t, err, errTimeout)
assert.ErrorIs(t, context.Cause(ctx), errTimeout)
})
@ -406,7 +406,7 @@ func TestLRUCacheConcurrency(t *testing.T) {
wg.Wait()
})
t.Run("test expire", func(t *testing.T) {
t.Run("test remove", func(t *testing.T) {
cache := NewCacheBuilder[int, int]().WithLoader(func(ctx context.Context, key int) (int, error) {
return key, nil
}).WithCapacity(5).WithFinalizer(func(ctx context.Context, key, value int) error {
@ -423,13 +423,12 @@ func TestLRUCacheConcurrency(t *testing.T) {
evicted := 0
for i := 0; i < 100; i++ {
if cache.Expire(context.Background(), i) {
if cache.Remove(context.Background(), i) == nil {
evicted++
}
}
assert.Equal(t, 100, evicted)
// all item shouldn't be evicted if they are in used.
for i := 0; i < 5; i++ {
ctx, cancel := contextutil.WithTimeoutCause(context.Background(), 2*time.Second, errTimeout)
defer cancel()
@ -440,28 +439,37 @@ func TestLRUCacheConcurrency(t *testing.T) {
for i := 0; i < 5; i++ {
go func(i int) {
defer wg.Done()
ctx, cancel := contextutil.WithTimeoutCause(context.Background(), 2*time.Second, errTimeout)
defer cancel()
cache.Do(ctx, i, func(ctx context.Context, v int) error {
time.Sleep(2 * time.Second)
cache.Do(context.Background(), i, func(ctx context.Context, v int) error {
time.Sleep(3 * time.Second)
return nil
})
}(i)
}
// wait for all goroutine to start
time.Sleep(1 * time.Second)
evicted = 0
for i := 0; i < 5; i++ {
if cache.Expire(context.Background(), i) {
evicted++
}
}
assert.Zero(t, evicted)
wg.Wait()
// all item shouldn't be evicted if they are in-used in 500ms.
evictedCount := atomic.NewInt32(0)
wgEvict := sync.WaitGroup{}
wgEvict.Add(5)
for i := 0; i < 5; i++ {
go func(i int) {
defer wgEvict.Done()
ctx, cancel := contextutil.WithTimeoutCause(context.Background(), 500*time.Millisecond, errTimeout)
defer cancel()
if cache.Remove(ctx, i) == nil {
evictedCount.Inc()
}
}(i)
}
wgEvict.Wait()
assert.Zero(t, evictedCount.Load())
// given enough time, all item should be evicted.
evicted = 0
for i := 0; i < 5; i++ {
if cache.Expire(context.Background(), i) {
if cache.Remove(context.Background(), i) == nil {
evicted++
}
}

View File

@ -57,7 +57,7 @@ func (cv *ContextCond) Wait(ctx context.Context) error {
select {
case <-ch:
case <-ctx.Done():
return ctx.Err()
return context.Cause(ctx)
}
cv.L.Lock()
return nil