mirror of https://github.com/milvus-io/milvus.git
Add mutex for query node map, search buffer and dataSyncService
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/4973/head^2
parent
cee0c4df11
commit
1e4afda824
|
@ -55,7 +55,7 @@ jobs:
|
|||
key: ubuntu${{ matrix.ubuntu }}-go-mod-${{ hashFiles('**/go.sum') }}
|
||||
restore-keys: ubuntu${{ matrix.ubuntu }}-go-mod-
|
||||
- name: Dockerfile Lint
|
||||
uses: reviewdog/action-hadolint@v1
|
||||
uses: reviewdog/action-hadolint@v1.16.1
|
||||
with:
|
||||
github_token: ${{ secrets.GITHUB_TOKEN }}
|
||||
reporter: github-pr-check # Default is github-pr-check
|
||||
|
|
|
@ -49,6 +49,7 @@ func (dsService *dataSyncService) close() {
|
|||
if dsService.fg != nil {
|
||||
dsService.fg.Close()
|
||||
}
|
||||
log.Debug("dataSyncService closed", zap.Int64("collectionID", dsService.collectionID))
|
||||
}
|
||||
|
||||
func (dsService *dataSyncService) initNodes() {
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"math/rand"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
|
@ -46,11 +47,12 @@ type QueryNode struct {
|
|||
replica ReplicaInterface
|
||||
|
||||
// internal services
|
||||
dataSyncServices map[UniqueID]*dataSyncService
|
||||
metaService *metaService
|
||||
searchService *searchService
|
||||
loadService *loadService
|
||||
statsService *statsService
|
||||
dsServicesMu sync.Mutex // guards dataSyncServices
|
||||
dataSyncServices map[UniqueID]*dataSyncService
|
||||
|
||||
// clients
|
||||
masterService types.MasterService
|
||||
|
@ -242,6 +244,32 @@ func (node *QueryNode) SetDataService(data types.DataService) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (node *QueryNode) getDataSyncService(collectionID UniqueID) (*dataSyncService, error) {
|
||||
node.dsServicesMu.Lock()
|
||||
defer node.dsServicesMu.Unlock()
|
||||
ds, ok := node.dataSyncServices[collectionID]
|
||||
if !ok {
|
||||
return nil, errors.New("cannot found dataSyncService, collectionID =" + fmt.Sprintln(collectionID))
|
||||
}
|
||||
return ds, nil
|
||||
}
|
||||
|
||||
func (node *QueryNode) addDataSyncService(collectionID UniqueID, ds *dataSyncService) error {
|
||||
node.dsServicesMu.Lock()
|
||||
defer node.dsServicesMu.Unlock()
|
||||
if _, ok := node.dataSyncServices[collectionID]; ok {
|
||||
return errors.New("dataSyncService has been existed, collectionID =" + fmt.Sprintln(collectionID))
|
||||
}
|
||||
node.dataSyncServices[collectionID] = ds
|
||||
return nil
|
||||
}
|
||||
|
||||
func (node *QueryNode) removeDataSyncService(collectionID UniqueID) {
|
||||
node.dsServicesMu.Lock()
|
||||
defer node.dsServicesMu.Unlock()
|
||||
delete(node.dataSyncServices, collectionID)
|
||||
}
|
||||
|
||||
func (node *QueryNode) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
|
||||
stats := &internalpb.ComponentStates{
|
||||
Status: &commonpb.Status{
|
||||
|
@ -367,8 +395,8 @@ func (node *QueryNode) RemoveQueryChannel(ctx context.Context, in *queryPb.Remov
|
|||
func (node *QueryNode) WatchDmChannels(ctx context.Context, in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error) {
|
||||
log.Debug("starting WatchDmChannels ...", zap.String("ChannelIDs", fmt.Sprintln(in.ChannelIDs)))
|
||||
collectionID := in.CollectionID
|
||||
service, ok := node.dataSyncServices[collectionID]
|
||||
if !ok || service.dmStream == nil {
|
||||
ds, err := node.getDataSyncService(collectionID)
|
||||
if err != nil || ds.dmStream == nil {
|
||||
errMsg := "null data sync service or null data manipulation stream, collectionID = " + fmt.Sprintln(collectionID)
|
||||
status := &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
|
@ -378,7 +406,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, in *queryPb.WatchDmC
|
|||
return status, errors.New(errMsg)
|
||||
}
|
||||
|
||||
switch t := service.dmStream.(type) {
|
||||
switch t := ds.dmStream.(type) {
|
||||
case *pulsarms.PulsarTtMsgStream:
|
||||
case *rmqms.RmqTtMsgStream:
|
||||
default:
|
||||
|
@ -424,9 +452,9 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, in *queryPb.WatchDmC
|
|||
}
|
||||
}
|
||||
|
||||
service.dmStream.AsConsumer(toDirSubChannels, consumeSubName)
|
||||
ds.dmStream.AsConsumer(toDirSubChannels, consumeSubName)
|
||||
for _, pos := range toSeekInfo {
|
||||
err := service.dmStream.Seek(pos)
|
||||
err := ds.dmStream.Seek(pos)
|
||||
if err != nil {
|
||||
errMsg := "msgStream seek error :" + err.Error()
|
||||
status := &commonpb.Status{
|
||||
|
@ -470,8 +498,16 @@ func (node *QueryNode) LoadSegments(ctx context.Context, in *queryPb.LoadSegment
|
|||
return status, err
|
||||
}
|
||||
node.replica.initExcludedSegments(collectionID)
|
||||
node.dataSyncServices[collectionID] = newDataSyncService(node.queryNodeLoopCtx, node.replica, node.msFactory, collectionID)
|
||||
go node.dataSyncServices[collectionID].start()
|
||||
newDS := newDataSyncService(node.queryNodeLoopCtx, node.replica, node.msFactory, collectionID)
|
||||
// ignore duplicated dataSyncService error
|
||||
node.addDataSyncService(collectionID, newDS)
|
||||
ds, err := node.getDataSyncService(collectionID)
|
||||
if err != nil {
|
||||
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
||||
status.Reason = err.Error()
|
||||
return status, err
|
||||
}
|
||||
go ds.start()
|
||||
node.searchService.startSearchCollection(collectionID)
|
||||
}
|
||||
if !hasPartition {
|
||||
|
@ -505,9 +541,10 @@ func (node *QueryNode) LoadSegments(ctx context.Context, in *queryPb.LoadSegment
|
|||
}
|
||||
|
||||
func (node *QueryNode) ReleaseCollection(ctx context.Context, in *queryPb.ReleaseCollectionRequest) (*commonpb.Status, error) {
|
||||
if _, ok := node.dataSyncServices[in.CollectionID]; ok {
|
||||
node.dataSyncServices[in.CollectionID].close()
|
||||
delete(node.dataSyncServices, in.CollectionID)
|
||||
ds, err := node.getDataSyncService(in.CollectionID)
|
||||
if err == nil && ds != nil {
|
||||
ds.close()
|
||||
node.removeDataSyncService(in.CollectionID)
|
||||
node.replica.removeTSafe(in.CollectionID)
|
||||
node.replica.removeExcludedSegments(in.CollectionID)
|
||||
}
|
||||
|
@ -516,7 +553,7 @@ func (node *QueryNode) ReleaseCollection(ctx context.Context, in *queryPb.Releas
|
|||
node.searchService.stopSearchCollection(in.CollectionID)
|
||||
}
|
||||
|
||||
err := node.replica.removeCollection(in.CollectionID)
|
||||
err = node.replica.removeCollection(in.CollectionID)
|
||||
if err != nil {
|
||||
status := &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
|
|
|
@ -25,8 +25,9 @@ type searchCollection struct {
|
|||
collectionID UniqueID
|
||||
replica ReplicaInterface
|
||||
|
||||
msgBuffer chan *msgstream.SearchMsg
|
||||
unsolvedMsg []*msgstream.SearchMsg
|
||||
msgBuffer chan *msgstream.SearchMsg
|
||||
unsolvedMSgMu sync.Mutex // guards unsolvedMsg
|
||||
unsolvedMsg []*msgstream.SearchMsg
|
||||
|
||||
tSafeMutex sync.Mutex
|
||||
tSafeWatcher *tSafeWatcher
|
||||
|
@ -75,6 +76,20 @@ func (s *searchCollection) register(collectionID UniqueID) {
|
|||
tSafe.registerTSafeWatcher(s.tSafeWatcher)
|
||||
}
|
||||
|
||||
func (s *searchCollection) addToUnsolvedMsg(msg *msgstream.SearchMsg) {
|
||||
s.unsolvedMSgMu.Lock()
|
||||
defer s.unsolvedMSgMu.Unlock()
|
||||
s.unsolvedMsg = append(s.unsolvedMsg, msg)
|
||||
}
|
||||
|
||||
func (s *searchCollection) popAllUnsolvedMsg() []*msgstream.SearchMsg {
|
||||
s.unsolvedMSgMu.Lock()
|
||||
defer s.unsolvedMSgMu.Unlock()
|
||||
tmp := s.unsolvedMsg
|
||||
s.unsolvedMsg = s.unsolvedMsg[:0]
|
||||
return tmp
|
||||
}
|
||||
|
||||
func (s *searchCollection) waitNewTSafe() (Timestamp, error) {
|
||||
// block until dataSyncService updating tSafe
|
||||
s.tSafeWatcher.hasUpdate()
|
||||
|
@ -122,7 +137,7 @@ func (s *searchCollection) receiveSearchMsg() {
|
|||
case sm := <-s.msgBuffer:
|
||||
serviceTime := s.getServiceableTime()
|
||||
if sm.BeginTs() > serviceTime {
|
||||
s.unsolvedMsg = append(s.unsolvedMsg, sm)
|
||||
s.addToUnsolvedMsg(sm)
|
||||
continue
|
||||
}
|
||||
err := s.search(sm)
|
||||
|
@ -153,15 +168,14 @@ func (s *searchCollection) doUnsolvedMsgSearch() {
|
|||
}
|
||||
|
||||
searchMsg := make([]*msgstream.SearchMsg, 0)
|
||||
tempMsg := s.unsolvedMsg
|
||||
s.unsolvedMsg = s.unsolvedMsg[:0]
|
||||
tempMsg := s.popAllUnsolvedMsg()
|
||||
|
||||
for _, sm := range tempMsg {
|
||||
if sm.EndTs() <= serviceTime {
|
||||
searchMsg = append(searchMsg, sm)
|
||||
continue
|
||||
}
|
||||
s.unsolvedMsg = append(s.unsolvedMsg, sm)
|
||||
s.addToUnsolvedMsg(sm)
|
||||
}
|
||||
|
||||
if len(searchMsg) <= 0 {
|
||||
|
|
|
@ -2,6 +2,7 @@ package queryservice
|
|||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
|
||||
|
@ -10,7 +11,9 @@ import (
|
|||
)
|
||||
|
||||
type queryNodeInfo struct {
|
||||
client types.QueryNode
|
||||
client types.QueryNode
|
||||
|
||||
mu sync.Mutex // guards segments and channels2Col
|
||||
segments map[UniqueID][]UniqueID
|
||||
channels2Col map[UniqueID][]string
|
||||
}
|
||||
|
@ -32,6 +35,8 @@ func (qn *queryNodeInfo) WatchDmChannels(ctx context.Context, in *querypb.WatchD
|
|||
}
|
||||
|
||||
func (qn *queryNodeInfo) AddDmChannels(channels []string, collectionID UniqueID) {
|
||||
qn.mu.Lock()
|
||||
defer qn.mu.Unlock()
|
||||
if _, ok := qn.channels2Col[collectionID]; !ok {
|
||||
chs := make([]string, 0)
|
||||
qn.channels2Col[collectionID] = chs
|
||||
|
@ -39,7 +44,15 @@ func (qn *queryNodeInfo) AddDmChannels(channels []string, collectionID UniqueID)
|
|||
qn.channels2Col[collectionID] = append(qn.channels2Col[collectionID], channels...)
|
||||
}
|
||||
|
||||
func (qn *queryNodeInfo) getChannels2Col() map[UniqueID][]string {
|
||||
qn.mu.Lock()
|
||||
defer qn.mu.Unlock()
|
||||
return qn.channels2Col
|
||||
}
|
||||
|
||||
func (qn *queryNodeInfo) AddSegments(segmentIDs []UniqueID, collectionID UniqueID) {
|
||||
qn.mu.Lock()
|
||||
defer qn.mu.Unlock()
|
||||
if _, ok := qn.segments[collectionID]; !ok {
|
||||
seg := make([]UniqueID, 0)
|
||||
qn.segments[collectionID] = seg
|
||||
|
@ -53,6 +66,8 @@ func (qn *queryNodeInfo) AddQueryChannel(ctx context.Context, in *querypb.AddQue
|
|||
|
||||
func (qn *queryNodeInfo) ReleaseCollection(ctx context.Context, in *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
|
||||
status, err := qn.client.ReleaseCollection(ctx, in)
|
||||
qn.mu.Lock()
|
||||
defer qn.mu.Unlock()
|
||||
if err != nil {
|
||||
return status, err
|
||||
}
|
||||
|
|
|
@ -757,7 +757,7 @@ func (qs *QueryService) shuffleChannelsToQueryNode(dmChannels []string) map[int6
|
|||
maxNumChannels := 0
|
||||
for _, node := range qs.queryNodes {
|
||||
numChannels := 0
|
||||
for _, chs := range node.channels2Col {
|
||||
for _, chs := range node.getChannels2Col() {
|
||||
numChannels += len(chs)
|
||||
}
|
||||
if numChannels > maxNumChannels {
|
||||
|
|
Loading…
Reference in New Issue