Return error message in grpc.LoadSegment in query node (#6908)

* return load error

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>

* init streaming meta in loadSegment

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>

* fix excluded init

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>

* improve impl

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/6985/head
bigsheeper 2021-08-03 22:01:27 +08:00 committed by GitHub
parent 64bdfa7481
commit 4123deef9e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 60 additions and 32 deletions

View File

@ -521,7 +521,9 @@ func (colReplica *collectionReplica) initExcludedSegments(collectionID UniqueID)
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
colReplica.excludedSegments[collectionID] = make([]*datapb.SegmentInfo, 0)
if _, ok := colReplica.excludedSegments[collectionID]; !ok {
colReplica.excludedSegments[collectionID] = make([]*datapb.SegmentInfo, 0)
}
}
func (colReplica *collectionReplica) removeExcludedSegments(collectionID UniqueID) {

View File

@ -216,19 +216,23 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, in *queryPb.WatchDmC
}
log.Debug("watchDmChannelsTask Enqueue done", zap.Any("collectionID", in.CollectionID))
func() {
waitFunc := func() (*commonpb.Status, error) {
err = dct.WaitToFinish()
if err != nil {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
log.Warn(err.Error())
return
return status, err
}
log.Debug("watchDmChannelsTask WaitToFinish done", zap.Any("collectionID", in.CollectionID))
}()
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
return status, nil
return waitFunc()
}
func (node *QueryNode) LoadSegments(ctx context.Context, in *queryPb.LoadSegmentsRequest) (*commonpb.Status, error) {
@ -265,19 +269,23 @@ func (node *QueryNode) LoadSegments(ctx context.Context, in *queryPb.LoadSegment
}
log.Debug("loadSegmentsTask Enqueue done", zap.Int64s("segmentIDs", segmentIDs))
func() {
waitFunc := func() (*commonpb.Status, error) {
err = dct.WaitToFinish()
if err != nil {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
log.Warn(err.Error())
return
return status, err
}
log.Debug("loadSegmentsTask WaitToFinish done", zap.Int64s("segmentIDs", segmentIDs))
}()
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
return status, nil
return waitFunc()
}
func (node *QueryNode) ReleaseCollection(ctx context.Context, in *queryPb.ReleaseCollectionRequest) (*commonpb.Status, error) {

View File

@ -76,22 +76,6 @@ func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest, onSer
partitionID := info.PartitionID
collectionID := info.CollectionID
// init replica
hasCollectionInHistorical := loader.historicalReplica.hasCollection(collectionID)
hasPartitionInHistorical := loader.historicalReplica.hasPartition(partitionID)
if !hasCollectionInHistorical {
err := loader.historicalReplica.addCollection(collectionID, req.Schema)
if err != nil {
return err
}
}
if !hasPartitionInHistorical {
err := loader.historicalReplica.addPartition(collectionID, partitionID)
if err != nil {
return err
}
}
collection, err := loader.historicalReplica.getCollectionByID(collectionID)
if err != nil {
log.Warn(err.Error())

View File

@ -144,8 +144,8 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
if err != nil {
return err
}
w.node.streaming.replica.initExcludedSegments(collectionID)
}
w.node.streaming.replica.initExcludedSegments(collectionID)
if hasCollectionInHistorical := w.node.historical.replica.hasCollection(collectionID); !hasCollectionInHistorical {
err := w.node.historical.replica.addCollection(collectionID, w.req.Schema)
if err != nil {
@ -340,6 +340,40 @@ func (l *loadSegmentsTask) Execute(ctx context.Context) error {
log.Debug("query node load segment", zap.String("loadSegmentRequest", fmt.Sprintln(l.req)))
var err error
// init meta
for _, info := range l.req.Infos {
collectionID := info.CollectionID
partitionID := info.PartitionID
hasCollectionInHistorical := l.node.historical.replica.hasCollection(collectionID)
hasPartitionInHistorical := l.node.historical.replica.hasPartition(partitionID)
if !hasCollectionInHistorical {
err = l.node.historical.replica.addCollection(collectionID, l.req.Schema)
if err != nil {
return err
}
}
if !hasPartitionInHistorical {
err = l.node.historical.replica.addPartition(collectionID, partitionID)
if err != nil {
return err
}
}
hasCollectionInStreaming := l.node.streaming.replica.hasCollection(collectionID)
hasPartitionInStreaming := l.node.streaming.replica.hasPartition(partitionID)
if !hasCollectionInStreaming {
err = l.node.streaming.replica.addCollection(collectionID, l.req.Schema)
if err != nil {
return err
}
}
if !hasPartitionInStreaming {
err = l.node.streaming.replica.addPartition(collectionID, partitionID)
if err != nil {
return err
}
}
}
switch l.req.LoadCondition {
case queryPb.TriggerCondition_handoff:
err = l.node.historical.loader.loadSegmentOfConditionHandOff(l.req)