Fix several code issues caused by carelessness (#17385)

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
pull/17393/head
zhenshan.cao 2022-06-06 18:58:05 +08:00 committed by GitHub
parent 9f786dd752
commit 98e95275fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 11 additions and 189 deletions

View File

@ -97,7 +97,9 @@ func (info *segInfo) Assign(ts Timestamp, count uint32) uint32 {
}
func (info *assignInfo) RemoveExpired(ts Timestamp) {
for e := info.segInfos.Front(); e != nil; e = e.Next() {
var next *list.Element
for e := info.segInfos.Front(); e != nil; e = next {
next = e.Next()
segInfo, ok := e.Value.(*segInfo)
if !ok {
log.Warn("can not cast to segInfo")
@ -175,8 +177,10 @@ func newSegIDAssigner(ctx context.Context, dataCoord DataCoord, getTickFunc func
func (sa *segIDAssigner) collectExpired() {
ts := sa.getTickFunc()
var next *list.Element
for _, info := range sa.assignInfos {
for e := info.Front(); e != nil; e = e.Next() {
for e := info.Front(); e != nil; e = next {
next = e.Next()
assign := e.Value.(*assignInfo)
assign.RemoveExpired(ts)
if assign.Capacity(ts) == 0 {

View File

@ -1,115 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package querynode
import (
"sync"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/querypb"
)
// globalSealedSegmentManager manages the globalSealedSegments
type globalSealedSegmentManager struct {
collectionID UniqueID
mu sync.Mutex // guards globalSealedSegments
globalSealedSegments map[UniqueID]*querypb.SegmentInfo // map[segmentID]SegmentInfo
}
// newGlobalSealedSegmentManager returns a new globalSealedSegmentManager
func newGlobalSealedSegmentManager(collectionID UniqueID) *globalSealedSegmentManager {
return &globalSealedSegmentManager{
collectionID: collectionID,
globalSealedSegments: make(map[UniqueID]*querypb.SegmentInfo),
}
}
// addGlobalSegmentInfo adds a new segmentInfo
func (g *globalSealedSegmentManager) addGlobalSegmentInfo(segmentInfo *querypb.SegmentInfo) {
g.mu.Lock()
defer g.mu.Unlock()
if segmentInfo.CollectionID != g.collectionID {
log.Warn("Find mismatch collectionID when addGlobalSegmentInfo",
zap.Any("manager collectionID", g.collectionID),
zap.Any("segmentInfo collectionID", segmentInfo.CollectionID),
)
}
g.globalSealedSegments[segmentInfo.SegmentID] = segmentInfo
}
// getGlobalSegmentIDs returns globalSealedSegments
func (g *globalSealedSegmentManager) getGlobalSegmentIDs() []UniqueID {
g.mu.Lock()
defer g.mu.Unlock()
resIDs := make([]UniqueID, 0)
for _, v := range g.globalSealedSegments {
resIDs = append(resIDs, v.SegmentID)
}
return resIDs
}
// getGlobalSegmentIDsByPartitionIds returns globalSealedSegments by partitionIDs
func (g *globalSealedSegmentManager) getGlobalSegmentIDsByPartitionIds(partitionIDs []UniqueID) []UniqueID {
g.mu.Lock()
defer g.mu.Unlock()
resIDs := make([]UniqueID, 0)
for _, v := range g.globalSealedSegments {
for _, partitionID := range partitionIDs {
if v.PartitionID == partitionID {
resIDs = append(resIDs, v.SegmentID)
}
}
}
return resIDs
}
// hasGlobalSealedSegment checks if globalSealedSegmentManager has globalSealedSegment by segmentID
func (g *globalSealedSegmentManager) hasGlobalSealedSegment(segmentID UniqueID) bool {
g.mu.Lock()
defer g.mu.Unlock()
_, ok := g.globalSealedSegments[segmentID]
return ok
}
// removeGlobalSealedSegmentInfo would remove globalSealSegment by segment
func (g *globalSealedSegmentManager) removeGlobalSealedSegmentInfo(segmentID UniqueID) {
g.mu.Lock()
defer g.mu.Unlock()
delete(g.globalSealedSegments, segmentID)
}
// removeGlobalSegmentIDsByPartitionIds would remove globalSealedSegments by partitionIDs
func (g *globalSealedSegmentManager) removeGlobalSegmentIDsByPartitionIds(partitionIDs []UniqueID) {
g.mu.Lock()
defer g.mu.Unlock()
for _, v := range g.globalSealedSegments {
for _, partitionID := range partitionIDs {
if v.PartitionID == partitionID {
delete(g.globalSealedSegments, v.SegmentID)
}
}
}
}
// close would free globalSealedSegmentManager
func (g *globalSealedSegmentManager) close() {
g.mu.Lock()
defer g.mu.Unlock()
g.globalSealedSegments = make(map[UniqueID]*querypb.SegmentInfo)
}

View File

@ -1,69 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package querynode
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/proto/querypb"
)
func TestGlobalSealedSegmentManager(t *testing.T) {
manager := newGlobalSealedSegmentManager(defaultCollectionID)
assert.NotNil(t, manager)
segmentInfo := &querypb.SegmentInfo{
SegmentID: defaultSegmentID,
CollectionID: defaultCollectionID,
PartitionID: defaultPartitionID,
}
manager.addGlobalSegmentInfo(segmentInfo)
segmentInfo.CollectionID = 1000
manager.addGlobalSegmentInfo(segmentInfo)
ids := manager.getGlobalSegmentIDs()
assert.Len(t, ids, 1)
assert.Equal(t, segmentInfo.SegmentID, ids[0])
ids = manager.getGlobalSegmentIDsByPartitionIds([]UniqueID{defaultPartitionID})
assert.Len(t, ids, 1)
assert.Equal(t, segmentInfo.SegmentID, ids[0])
manager.removeGlobalSegmentIDsByPartitionIds([]UniqueID{defaultPartitionID})
ids = manager.getGlobalSegmentIDs()
assert.Len(t, ids, 0)
segmentInfo.CollectionID = defaultCollectionID
manager.addGlobalSegmentInfo(segmentInfo)
manager.removeGlobalSealedSegmentInfo(defaultSegmentID)
ids = manager.getGlobalSegmentIDs()
assert.Len(t, ids, 0)
has := manager.hasGlobalSealedSegment(defaultSegmentID)
assert.False(t, has)
segmentInfo.CollectionID = defaultCollectionID
manager.addGlobalSegmentInfo(segmentInfo)
manager.close()
ids = manager.getGlobalSegmentIDs()
assert.Len(t, ids, 0)
}

View File

@ -230,7 +230,7 @@ func (s *taskScheduler) popAndAddToExecute() {
if curUsage < 0 {
curUsage = 0
}
metrics.QueryNodeReadTaskConcurrency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(curUsage))
metrics.QueryNodeEstimateCPUUsage.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(curUsage))
targetUsage := s.maxCPUUsage - curUsage
if targetUsage <= 0 {
return
@ -359,5 +359,5 @@ func (s *taskScheduler) tryMergeReadTasks() {
metrics.QueryNodeReadTaskUnsolveLen.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(s.unsolvedReadTasks.Len()))
metrics.QueryNodeReadTaskReadyLen.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(s.readyReadTasks.Len()))
readConcurrency := atomic.LoadInt32(&s.readConcurrency)
metrics.QueryNodeEstimateCPUUsage.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(readConcurrency))
metrics.QueryNodeReadTaskConcurrency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(readConcurrency))
}

View File

@ -181,7 +181,9 @@ func (s *searchTask) estimateCPUUsage() {
segmentNum := int64(len(s.req.GetSegmentIDs()))
s.cpu = int32(s.NQ * segmentNum / 2)
}
if s.cpu > s.maxCPU {
if s.cpu <= 0 {
s.cpu = 5
} else if s.cpu > s.maxCPU {
s.cpu = s.maxCPU
}
}