Fix release collection block in query node (#11249)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/11211/head
bigsheeper 2021-11-04 19:51:07 +08:00 committed by GitHub
parent 674c75bca5
commit e21068ffbc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 86 additions and 4 deletions

View File

@ -191,10 +191,10 @@ func (colReplica *collectionReplica) addCollection(collectionID UniqueID, schema
// removeCollection removes the collection from collectionReplica
func (colReplica *collectionReplica) removeCollection(collectionID UniqueID) error {
colReplica.mu.Lock()
colReplica.queryMu.Lock()
defer colReplica.queryMu.Unlock()
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
defer colReplica.queryMu.Unlock()
return colReplica.removeCollectionPrivate(collectionID)
}
@ -341,10 +341,10 @@ func (colReplica *collectionReplica) addPartitionPrivate(collectionID UniqueID,
// removePartition removes the partition from collectionReplica
func (colReplica *collectionReplica) removePartition(partitionID UniqueID) error {
colReplica.mu.Lock()
colReplica.queryMu.Lock()
defer colReplica.queryMu.Unlock()
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
defer colReplica.queryMu.Unlock()
return colReplica.removePartitionPrivate(partitionID)
}

View File

@ -17,14 +17,17 @@ import (
"encoding/binary"
"math"
"math/rand"
"sync"
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/common"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
@ -705,3 +708,82 @@ func TestQueryCollection_adjustByChangeInfo(t *testing.T) {
assert.Nil(t, err)
})
}
func TestQueryCollection_search_while_release(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
t.Run("test search while release collection", func(t *testing.T) {
queryCollection, err := genSimpleQueryCollection(ctx, cancel)
assert.NoError(t, err)
queryChannel := genQueryChannel()
queryCollection.queryResultMsgStream.AsProducer([]Channel{queryChannel})
queryCollection.queryResultMsgStream.Start()
msg, err := genSimpleSearchMsg()
assert.NoError(t, err)
// To prevent data race in search trackCtx
searchMu := &sync.Mutex{}
runSearchWhileReleaseCollection := func(wg *sync.WaitGroup) {
go func() {
_ = queryCollection.streaming.replica.removeCollection(defaultCollectionID)
wg.Done()
}()
go func() {
searchMu.Lock()
_ = queryCollection.search(msg)
searchMu.Unlock()
wg.Done()
}()
}
wg := &sync.WaitGroup{}
for i := 0; i < 10; i++ {
log.Debug("runSearchWhileReleaseCollection", zap.Any("time", i))
wg.Add(2)
go runSearchWhileReleaseCollection(wg)
}
wg.Wait()
})
t.Run("test search while release partition", func(t *testing.T) {
queryCollection, err := genSimpleQueryCollection(ctx, cancel)
assert.NoError(t, err)
queryChannel := genQueryChannel()
queryCollection.queryResultMsgStream.AsProducer([]Channel{queryChannel})
queryCollection.queryResultMsgStream.Start()
msg, err := genSimpleSearchMsg()
assert.NoError(t, err)
// To prevent data race in search trackCtx
searchMu := &sync.Mutex{}
runSearchWhileReleasePartition := func(wg *sync.WaitGroup) {
go func() {
_ = queryCollection.streaming.replica.removePartition(defaultPartitionID)
wg.Done()
}()
go func() {
searchMu.Lock()
_ = queryCollection.search(msg)
searchMu.Unlock()
wg.Done()
}()
}
wg := &sync.WaitGroup{}
for i := 0; i < 10; i++ {
log.Debug("runSearchWhileReleasePartition", zap.Any("time", i))
wg.Add(2)
go runSearchWhileReleasePartition(wg)
}
wg.Wait()
})
}