Fix query hang bug (#5709)

Signed-off-by: fishpenguin <kun.yu@zilliz.com>
pull/5722/head
yukun 2021-06-10 14:33:49 +08:00 committed by GitHub
parent 346366386a
commit e97936ba1c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 9 additions and 4 deletions

View File

@ -1217,7 +1217,7 @@ func (node *ProxyNode) Retrieve(ctx context.Context, request *milvuspb.RetrieveR
zap.String("db", request.DbName), zap.String("db", request.DbName),
zap.String("collection", request.CollectionName), zap.String("collection", request.CollectionName),
zap.Any("partitions", request.PartitionNames), zap.Any("partitions", request.PartitionNames),
zap.Any("len(Ids)", len(request.Ids.IdField.(*schemapb.IDs_IntId).IntId.Data))) zap.Any("len(Ids)", len(rt.result.Ids.IdField.(*schemapb.IDs_IntId).IntId.Data)))
}() }()
err = rt.WaitToFinish() err = rt.WaitToFinish()
@ -1368,7 +1368,7 @@ func (node *ProxyNode) Query(ctx context.Context, request *milvuspb.QueryRequest
zap.String("db", retrieveRequest.DbName), zap.String("db", retrieveRequest.DbName),
zap.String("collection", retrieveRequest.CollectionName), zap.String("collection", retrieveRequest.CollectionName),
zap.Any("partitions", retrieveRequest.PartitionNames), zap.Any("partitions", retrieveRequest.PartitionNames),
zap.Any("len(Ids)", len(retrieveRequest.Ids.IdField.(*schemapb.IDs_IntId).IntId.Data))) zap.Any("len(Ids)", len(rt.result.Ids.IdField.(*schemapb.IDs_IntId).IntId.Data)))
}() }()
err = rt.WaitToFinish() err = rt.WaitToFinish()

View File

@ -94,6 +94,12 @@ func (rc *retrieveCollection) getServiceableTime() Timestamp {
func (rc *retrieveCollection) setServiceableTime(t Timestamp) { func (rc *retrieveCollection) setServiceableTime(t Timestamp) {
rc.serviceableTimeMutex.Lock() rc.serviceableTimeMutex.Lock()
defer rc.serviceableTimeMutex.Unlock()
if t < rc.serviceableTime {
return
}
gracefulTimeInMilliSecond := Params.GracefulTime gracefulTimeInMilliSecond := Params.GracefulTime
if gracefulTimeInMilliSecond > 0 { if gracefulTimeInMilliSecond > 0 {
gracefulTime := tsoutil.ComposeTS(gracefulTimeInMilliSecond, 0) gracefulTime := tsoutil.ComposeTS(gracefulTimeInMilliSecond, 0)
@ -101,7 +107,6 @@ func (rc *retrieveCollection) setServiceableTime(t Timestamp) {
} else { } else {
rc.serviceableTime = t rc.serviceableTime = t
} }
rc.serviceableTimeMutex.Unlock()
} }
func (rc *retrieveCollection) waitNewTSafe() Timestamp { func (rc *retrieveCollection) waitNewTSafe() Timestamp {
@ -128,7 +133,7 @@ func (rc *retrieveCollection) start() {
func (rc *retrieveCollection) register() { func (rc *retrieveCollection) register() {
// register tSafe watcher and init watcher select case // register tSafe watcher and init watcher select case
collection, err := rc.historicalReplica.getCollectionByID(rc.collectionID) collection, err := rc.streamingReplica.getCollectionByID(rc.collectionID)
if err != nil { if err != nil {
log.Error(err.Error()) log.Error(err.Error())
return return