mirror of https://github.com/milvus-io/milvus.git
Add msg id to log in search path. (#17654)
issue: #17513 Add msg id to log in search path, which should help us to trace a detailed request more easier. Signed-off-by: longjiquan <jiquan.long@zilliz.com>pull/17654/merge
parent
37464a281d
commit
0282f69eb4
|
@ -55,6 +55,124 @@ type searchTask struct {
|
|||
shardMgr *shardClientMgr
|
||||
}
|
||||
|
||||
func getPartitionIDs(ctx context.Context, collectionName string, partitionNames []string) (partitionIDs []UniqueID, err error) {
|
||||
for _, tag := range partitionNames {
|
||||
if err := validatePartitionTag(tag, false); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
partitionsMap, err := globalMetaCache.GetPartitions(ctx, collectionName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
partitionsRecord := make(map[UniqueID]bool)
|
||||
partitionIDs = make([]UniqueID, 0, len(partitionNames))
|
||||
for _, partitionName := range partitionNames {
|
||||
pattern := fmt.Sprintf("^%s$", partitionName)
|
||||
re, err := regexp.Compile(pattern)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid partition: %s", partitionName)
|
||||
}
|
||||
found := false
|
||||
for name, pID := range partitionsMap {
|
||||
if re.MatchString(name) {
|
||||
if _, exist := partitionsRecord[pID]; !exist {
|
||||
partitionIDs = append(partitionIDs, pID)
|
||||
partitionsRecord[pID] = true
|
||||
}
|
||||
found = true
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
return nil, fmt.Errorf("partition name %s not found", partitionName)
|
||||
}
|
||||
}
|
||||
return partitionIDs, nil
|
||||
}
|
||||
|
||||
func parseQueryInfo(searchParamsPair []*commonpb.KeyValuePair) (*planpb.QueryInfo, error) {
|
||||
topKStr, err := funcutil.GetAttrByKeyFromRepeatedKV(TopKKey, searchParamsPair)
|
||||
if err != nil {
|
||||
return nil, errors.New(TopKKey + " not found in search_params")
|
||||
}
|
||||
topK, err := strconv.Atoi(topKStr)
|
||||
if err != nil {
|
||||
return nil, errors.New(TopKKey + " " + topKStr + " is not invalid")
|
||||
}
|
||||
|
||||
metricType, err := funcutil.GetAttrByKeyFromRepeatedKV(MetricTypeKey, searchParamsPair)
|
||||
if err != nil {
|
||||
return nil, errors.New(MetricTypeKey + " not found in search_params")
|
||||
}
|
||||
|
||||
searchParams, err := funcutil.GetAttrByKeyFromRepeatedKV(SearchParamsKey, searchParamsPair)
|
||||
if err != nil {
|
||||
return nil, errors.New(SearchParamsKey + " not found in search_params")
|
||||
}
|
||||
|
||||
roundDecimalStr, err := funcutil.GetAttrByKeyFromRepeatedKV(RoundDecimalKey, searchParamsPair)
|
||||
if err != nil {
|
||||
roundDecimalStr = "-1"
|
||||
}
|
||||
roundDecimal, err := strconv.Atoi(roundDecimalStr)
|
||||
if err != nil {
|
||||
return nil, errors.New(RoundDecimalKey + " " + roundDecimalStr + " is not invalid")
|
||||
}
|
||||
|
||||
if roundDecimal != -1 && (roundDecimal > 6 || roundDecimal < 0) {
|
||||
return nil, errors.New(RoundDecimalKey + " " + roundDecimalStr + " is not invalid")
|
||||
}
|
||||
|
||||
return &planpb.QueryInfo{
|
||||
Topk: int64(topK),
|
||||
MetricType: metricType,
|
||||
SearchParams: searchParams,
|
||||
RoundDecimal: int64(roundDecimal),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func getOutputFieldIDs(schema *schemapb.CollectionSchema, outputFields []string) (outputFieldIDs []UniqueID, err error) {
|
||||
outputFieldIDs = make([]UniqueID, 0, len(outputFields))
|
||||
for _, name := range outputFields {
|
||||
hitField := false
|
||||
for _, field := range schema.GetFields() {
|
||||
if field.Name == name {
|
||||
if field.DataType == schemapb.DataType_BinaryVector || field.DataType == schemapb.DataType_FloatVector {
|
||||
return nil, errors.New("search doesn't support vector field as output_fields")
|
||||
}
|
||||
outputFieldIDs = append(outputFieldIDs, field.GetFieldID())
|
||||
|
||||
hitField = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !hitField {
|
||||
errMsg := "Field " + name + " not exist"
|
||||
return nil, errors.New(errMsg)
|
||||
}
|
||||
}
|
||||
return outputFieldIDs, nil
|
||||
}
|
||||
|
||||
func getNq(req *milvuspb.SearchRequest) (int64, error) {
|
||||
if req.GetNq() == 0 {
|
||||
// keep compatible with older client version.
|
||||
x := &commonpb.PlaceholderGroup{}
|
||||
err := proto.Unmarshal(req.GetPlaceholderGroup(), x)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
total := int64(0)
|
||||
for _, h := range x.GetPlaceholders() {
|
||||
total += int64(len(h.Values))
|
||||
}
|
||||
return total, nil
|
||||
}
|
||||
return req.GetNq(), nil
|
||||
}
|
||||
|
||||
func (t *searchTask) PreExecute(ctx context.Context) error {
|
||||
sp, ctx := trace.StartSpanFromContextWithOperationName(t.TraceCtx(), "Proxy-Search-PreExecute")
|
||||
defer sp.Finish()
|
||||
|
@ -67,53 +185,24 @@ func (t *searchTask) PreExecute(ctx context.Context) error {
|
|||
t.Base.SourceID = Params.ProxyCfg.GetNodeID()
|
||||
|
||||
collectionName := t.request.CollectionName
|
||||
if err := validateCollectionName(collectionName); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
t.collectionName = collectionName
|
||||
collID, err := globalMetaCache.GetCollectionID(ctx, collectionName)
|
||||
if err != nil { // err is not nil if collection not exists
|
||||
return err
|
||||
}
|
||||
t.CollectionID = collID
|
||||
t.collectionName = collectionName
|
||||
t.PartitionIDs = []UniqueID{}
|
||||
|
||||
for _, tag := range t.request.PartitionNames {
|
||||
if err := validatePartitionTag(tag, false); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
t.SearchRequest.DbID = 0 // todo
|
||||
t.SearchRequest.CollectionID = collID
|
||||
t.schema, _ = globalMetaCache.GetCollectionSchema(ctx, collectionName)
|
||||
|
||||
partitionsMap, err := globalMetaCache.GetPartitions(ctx, collectionName)
|
||||
// translate partition name to partition ids. Use regex-pattern to match partition name.
|
||||
t.SearchRequest.PartitionIDs, err = getPartitionIDs(ctx, collectionName, t.request.GetPartitionNames())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
partitionsRecord := make(map[UniqueID]bool)
|
||||
for _, partitionName := range t.request.PartitionNames {
|
||||
pattern := fmt.Sprintf("^%s$", partitionName)
|
||||
re, err := regexp.Compile(pattern)
|
||||
if err != nil {
|
||||
return errors.New("invalid partition names")
|
||||
}
|
||||
found := false
|
||||
for name, pID := range partitionsMap {
|
||||
if re.MatchString(name) {
|
||||
if _, exist := partitionsRecord[pID]; !exist {
|
||||
t.PartitionIDs = append(t.PartitionIDs, pID)
|
||||
partitionsRecord[pID] = true
|
||||
}
|
||||
found = true
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
return fmt.Errorf("partition name %s not found", partitionName)
|
||||
}
|
||||
}
|
||||
|
||||
// check if collection/partitions are loaded into query node
|
||||
loaded, err := t.checkIfLoaded(collID, t.PartitionIDs)
|
||||
loaded, err := t.checkIfLoaded(collID, t.SearchRequest.GetPartitionIDs())
|
||||
if err != nil {
|
||||
return fmt.Errorf("checkIfLoaded failed when search, collection:%v, partitions:%v, err = %s", collectionName, t.request.GetPartitionNames(), err)
|
||||
}
|
||||
|
@ -121,113 +210,56 @@ func (t *searchTask) PreExecute(ctx context.Context) error {
|
|||
return fmt.Errorf("collection:%v or partition:%v not loaded into memory when search", collectionName, t.request.GetPartitionNames())
|
||||
}
|
||||
|
||||
// TODO(dragondriver): necessary to check if partition was loaded into query node?
|
||||
t.Base.MsgType = commonpb.MsgType_Search
|
||||
|
||||
t.schema, _ = globalMetaCache.GetCollectionSchema(ctx, collectionName)
|
||||
|
||||
outputFields, err := translateOutputFields(t.request.OutputFields, t.schema, false)
|
||||
t.request.OutputFields, err = translateOutputFields(t.request.OutputFields, t.schema, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debug("translate output fields", zap.Any("OutputFields", outputFields))
|
||||
t.request.OutputFields = outputFields
|
||||
log.Debug("translate output fields", zap.Int64("msgID", t.ID()),
|
||||
zap.Strings("output fields", t.request.GetOutputFields()))
|
||||
|
||||
if t.request.GetDslType() == commonpb.DslType_BoolExprV1 {
|
||||
annsField, err := funcutil.GetAttrByKeyFromRepeatedKV(AnnsFieldKey, t.request.SearchParams)
|
||||
annsField, err := funcutil.GetAttrByKeyFromRepeatedKV(AnnsFieldKey, t.request.GetSearchParams())
|
||||
if err != nil {
|
||||
return errors.New(AnnsFieldKey + " not found in search_params")
|
||||
}
|
||||
|
||||
topKStr, err := funcutil.GetAttrByKeyFromRepeatedKV(TopKKey, t.request.SearchParams)
|
||||
queryInfo, err := parseQueryInfo(t.request.GetSearchParams())
|
||||
if err != nil {
|
||||
return errors.New(TopKKey + " not found in search_params")
|
||||
return err
|
||||
}
|
||||
topK, err := strconv.Atoi(topKStr)
|
||||
if err != nil {
|
||||
return errors.New(TopKKey + " " + topKStr + " is not invalid")
|
||||
}
|
||||
|
||||
metricType, err := funcutil.GetAttrByKeyFromRepeatedKV(MetricTypeKey, t.request.SearchParams)
|
||||
if err != nil {
|
||||
return errors.New(MetricTypeKey + " not found in search_params")
|
||||
}
|
||||
t.SearchRequest.MetricType = metricType
|
||||
|
||||
searchParams, err := funcutil.GetAttrByKeyFromRepeatedKV(SearchParamsKey, t.request.SearchParams)
|
||||
if err != nil {
|
||||
return errors.New(SearchParamsKey + " not found in search_params")
|
||||
}
|
||||
roundDecimalStr, err := funcutil.GetAttrByKeyFromRepeatedKV(RoundDecimalKey, t.request.SearchParams)
|
||||
if err != nil {
|
||||
roundDecimalStr = "-1"
|
||||
}
|
||||
roundDecimal, err := strconv.Atoi(roundDecimalStr)
|
||||
if err != nil {
|
||||
return errors.New(RoundDecimalKey + " " + roundDecimalStr + " is not invalid")
|
||||
}
|
||||
|
||||
if roundDecimal != -1 && (roundDecimal > 6 || roundDecimal < 0) {
|
||||
return errors.New(RoundDecimalKey + " " + roundDecimalStr + " is not invalid")
|
||||
}
|
||||
|
||||
queryInfo := &planpb.QueryInfo{
|
||||
Topk: int64(topK),
|
||||
MetricType: metricType,
|
||||
SearchParams: searchParams,
|
||||
RoundDecimal: int64(roundDecimal),
|
||||
}
|
||||
|
||||
log.Debug("create query plan",
|
||||
//zap.Any("schema", schema),
|
||||
zap.String("dsl", t.request.Dsl),
|
||||
zap.String("anns field", annsField),
|
||||
zap.Any("query info", queryInfo))
|
||||
|
||||
plan, err := planparserv2.CreateSearchPlan(t.schema, t.request.Dsl, annsField, queryInfo)
|
||||
if err != nil {
|
||||
log.Debug("failed to create query plan",
|
||||
zap.Error(err),
|
||||
//zap.Any("schema", schema),
|
||||
zap.String("dsl", t.request.Dsl),
|
||||
zap.String("anns field", annsField),
|
||||
zap.Any("query info", queryInfo))
|
||||
|
||||
log.Debug("failed to create query plan", zap.Error(err), zap.Int64("msgID", t.ID()),
|
||||
zap.String("dsl", t.request.Dsl), // may be very large if large term passed.
|
||||
zap.String("anns field", annsField), zap.Any("query info", queryInfo))
|
||||
return fmt.Errorf("failed to create query plan: %v", err)
|
||||
}
|
||||
for _, name := range t.request.OutputFields {
|
||||
hitField := false
|
||||
for _, field := range t.schema.Fields {
|
||||
if field.Name == name {
|
||||
if field.DataType == schemapb.DataType_BinaryVector || field.DataType == schemapb.DataType_FloatVector {
|
||||
return errors.New("search doesn't support vector field as output_fields")
|
||||
}
|
||||
log.Debug("create query plan", zap.Int64("msgID", t.ID()),
|
||||
zap.String("dsl", t.request.Dsl), // may be very large if large term passed.
|
||||
zap.String("anns field", annsField), zap.Any("query info", queryInfo))
|
||||
|
||||
t.SearchRequest.OutputFieldsId = append(t.SearchRequest.OutputFieldsId, field.FieldID)
|
||||
plan.OutputFieldIds = append(plan.OutputFieldIds, field.FieldID)
|
||||
hitField = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !hitField {
|
||||
errMsg := "Field " + name + " not exist"
|
||||
return errors.New(errMsg)
|
||||
}
|
||||
outputFieldIDs, err := getOutputFieldIDs(t.schema, t.request.GetOutputFields())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t.SearchRequest.OutputFieldsId = outputFieldIDs
|
||||
plan.OutputFieldIds = outputFieldIDs
|
||||
|
||||
t.SearchRequest.MetricType = queryInfo.GetMetricType()
|
||||
t.SearchRequest.DslType = commonpb.DslType_BoolExprV1
|
||||
t.SearchRequest.SerializedExprPlan, err = proto.Marshal(plan)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
t.SearchRequest.Topk = int64(topK)
|
||||
err = validateTopK(int64(topK))
|
||||
if err != nil {
|
||||
t.SearchRequest.Topk = queryInfo.GetTopk()
|
||||
if err := validateTopK(queryInfo.GetTopk()); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debug("Proxy::searchTask::PreExecute", zap.Any("plan.OutputFieldIds", plan.OutputFieldIds),
|
||||
zap.Any("plan", plan.String()))
|
||||
log.Debug("Proxy::searchTask::PreExecute", zap.Int64("msgID", t.ID()),
|
||||
zap.Int64s("plan.OutputFieldIds", plan.GetOutputFieldIds()),
|
||||
zap.String("plan", plan.String())) // may be very large if large term passed.
|
||||
}
|
||||
|
||||
travelTimestamp := t.request.TravelTimestamp
|
||||
|
@ -249,22 +281,15 @@ func (t *searchTask) PreExecute(ctx context.Context) error {
|
|||
t.SearchRequest.TimeoutTimestamp = tsoutil.ComposeTSByTime(deadline, 0)
|
||||
}
|
||||
|
||||
t.DbID = 0 // todo
|
||||
t.SearchRequest.Dsl = t.request.Dsl
|
||||
t.SearchRequest.PlaceholderGroup = t.request.PlaceholderGroup
|
||||
if t.request.GetNq() == 0 {
|
||||
x := &commonpb.PlaceholderGroup{}
|
||||
err := proto.Unmarshal(t.request.GetPlaceholderGroup(), x)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, h := range x.GetPlaceholders() {
|
||||
t.request.Nq += int64(len(h.Values))
|
||||
}
|
||||
if t.SearchRequest.Nq, err = getNq(t.request); err != nil {
|
||||
return err
|
||||
}
|
||||
t.SearchRequest.Nq = t.request.GetNq()
|
||||
log.Info("search PreExecute done.",
|
||||
zap.Any("requestID", t.Base.MsgID), zap.Any("requestType", "search"))
|
||||
log.Info("search PreExecute done.", zap.Int64("msgID", t.ID()),
|
||||
zap.Uint64("travel_ts", travelTimestamp), zap.Uint64("guarantee_ts", guaranteeTs),
|
||||
zap.Uint64("timeout_ts", t.SearchRequest.GetTimeoutTimestamp()))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -290,7 +315,7 @@ func (t *searchTask) Execute(ctx context.Context) error {
|
|||
channelID := channelID
|
||||
leaders := leaders
|
||||
t.runningGroup.Go(func() error {
|
||||
log.Debug("proxy starting to query one shard",
|
||||
log.Debug("proxy starting to query one shard", zap.Int64("msgID", t.ID()),
|
||||
zap.Int64("collectionID", t.CollectionID),
|
||||
zap.String("collection name", t.collectionName),
|
||||
zap.String("shard channel", channelID),
|
||||
|
@ -305,15 +330,15 @@ func (t *searchTask) Execute(ctx context.Context) error {
|
|||
|
||||
err := executeSearch(WithCache)
|
||||
if errors.Is(err, errInvalidShardLeaders) || funcutil.IsGrpcErr(err) || errors.Is(err, grpcclient.ErrConnect) {
|
||||
log.Warn("first search failed, updating shardleader caches and retry search", zap.Error(err))
|
||||
log.Warn("first search failed, updating shardleader caches and retry search",
|
||||
zap.Int64("msgID", t.ID()), zap.Error(err))
|
||||
return executeSearch(WithoutCache)
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("fail to search on all shard leaders, err=%w", err)
|
||||
}
|
||||
|
||||
log.Debug("Search Execute done.",
|
||||
zap.Any("requestID", t.Base.MsgID), zap.Any("requestType", "search"))
|
||||
log.Debug("Search Execute done.", zap.Int64("msgID", t.ID()))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -331,14 +356,14 @@ func (t *searchTask) PostExecute(ctx context.Context) error {
|
|||
for {
|
||||
select {
|
||||
case <-t.TraceCtx().Done():
|
||||
log.Debug("wait to finish timeout!", zap.Int64("taskID", t.ID()))
|
||||
log.Debug("wait to finish timeout!", zap.Int64("msgID", t.ID()))
|
||||
return
|
||||
case <-t.runningGroupCtx.Done():
|
||||
log.Debug("all searches are finished or canceled", zap.Any("taskID", t.ID()))
|
||||
log.Debug("all searches are finished or canceled", zap.Int64("msgID", t.ID()))
|
||||
close(t.resultBuf)
|
||||
for res := range t.resultBuf {
|
||||
t.toReduceResults = append(t.toReduceResults, res)
|
||||
log.Debug("proxy receives one query result", zap.Int64("sourceID", res.GetBase().GetSourceID()), zap.Any("taskID", t.ID()))
|
||||
log.Debug("proxy receives one query result", zap.Int64("sourceID", res.GetBase().GetSourceID()), zap.Int64("msgID", t.ID()))
|
||||
}
|
||||
wg.Done()
|
||||
return
|
||||
|
@ -354,9 +379,11 @@ func (t *searchTask) PostExecute(ctx context.Context) error {
|
|||
}
|
||||
metrics.ProxyDecodeResultLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10),
|
||||
metrics.SearchLabel).Observe(float64(tr.RecordSpan().Milliseconds()))
|
||||
log.Debug("proxy search post execute stage 2", zap.Any("len(validSearchResults)", len(validSearchResults)))
|
||||
|
||||
log.Debug("proxy search post execute stage 2", zap.Int64("msgID", t.ID()),
|
||||
zap.Int("len(validSearchResults)", len(validSearchResults)))
|
||||
if len(validSearchResults) <= 0 {
|
||||
log.Warn("search result is empty", zap.Any("requestID", t.Base.MsgID), zap.String("requestType", "search"))
|
||||
log.Warn("search result is empty", zap.Int64("msgID", t.ID()))
|
||||
|
||||
t.result = &milvuspb.SearchResults{
|
||||
Status: &commonpb.Status{
|
||||
|
@ -403,7 +430,7 @@ func (t *searchTask) PostExecute(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
}
|
||||
log.Info("Search post execute done", zap.Any("requestID", t.Base.MsgID), zap.String("requestType", "search"))
|
||||
log.Info("Search post execute done", zap.Int64("msgID", t.ID()))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -417,17 +444,18 @@ func (t *searchTask) searchShard(ctx context.Context, leaders []nodeInfo, channe
|
|||
}
|
||||
result, err := qn.Search(ctx, req)
|
||||
if err != nil {
|
||||
log.Warn("QueryNode search return error", zap.Int64("nodeID", nodeID), zap.String("channel", channelID),
|
||||
zap.Error(err))
|
||||
log.Warn("QueryNode search return error", zap.Int64("msgID", t.ID()),
|
||||
zap.Int64("nodeID", nodeID), zap.String("channel", channelID), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
if result.GetStatus().GetErrorCode() == commonpb.ErrorCode_NotShardLeader {
|
||||
log.Warn("QueryNode is not shardLeader", zap.Int64("nodeID", nodeID), zap.String("channel", channelID))
|
||||
log.Warn("QueryNode is not shardLeader", zap.Int64("msgID", t.ID()),
|
||||
zap.Int64("nodeID", nodeID), zap.String("channel", channelID))
|
||||
return errInvalidShardLeaders
|
||||
}
|
||||
if result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
|
||||
log.Warn("QueryNode search result error", zap.Int64("nodeID", nodeID),
|
||||
zap.String("reason", result.GetStatus().GetReason()))
|
||||
log.Warn("QueryNode search result error", zap.Int64("msgID", t.ID()),
|
||||
zap.Int64("nodeID", nodeID), zap.String("reason", result.GetStatus().GetReason()))
|
||||
return fmt.Errorf("fail to Search, QueryNode ID=%d, reason=%s", nodeID, result.GetStatus().GetReason())
|
||||
}
|
||||
t.resultBuf <- result
|
||||
|
@ -436,7 +464,8 @@ func (t *searchTask) searchShard(ctx context.Context, leaders []nodeInfo, channe
|
|||
|
||||
err := t.searchShardPolicy(t.TraceCtx(), t.shardMgr, search, leaders)
|
||||
if err != nil {
|
||||
log.Warn("fail to search to all shard leaders", zap.Any("shard leaders", leaders))
|
||||
log.Warn("fail to search to all shard leaders", zap.Int64("msgID", t.ID()),
|
||||
zap.Any("shard leaders", leaders))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -495,9 +524,8 @@ func (t *searchTask) checkIfLoaded(collectionID UniqueID, searchPartitionIDs []U
|
|||
}
|
||||
|
||||
if len(resp.GetPartitionIDs()) > 0 {
|
||||
log.Warn("collection not fully loaded, search on these partitions",
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Int64s("partitionIDs", resp.GetPartitionIDs()))
|
||||
log.Warn("collection not fully loaded, search on these partitions", zap.Int64("msgID", t.ID()),
|
||||
zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", resp.GetPartitionIDs()))
|
||||
return true, nil
|
||||
}
|
||||
|
||||
|
@ -730,41 +758,3 @@ func (t *searchTask) OnEnqueue() error {
|
|||
t.Base.SourceID = Params.ProxyCfg.GetNodeID()
|
||||
return nil
|
||||
}
|
||||
|
||||
// func (t *searchTaskV2) getChannels() ([]pChan, error) {
|
||||
// collID, err := globalMetaCache.GetCollectionID(t.ctx, t.request.CollectionName)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
//
|
||||
// var channels []pChan
|
||||
// channels, err = t.chMgr.getChannels(collID)
|
||||
// if err != nil {
|
||||
// err := t.chMgr.createDMLMsgStream(collID)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// return t.chMgr.getChannels(collID)
|
||||
// }
|
||||
//
|
||||
// return channels, nil
|
||||
// }
|
||||
|
||||
// func (t *searchTaskV2) getVChannels() ([]vChan, error) {
|
||||
// collID, err := globalMetaCache.GetCollectionID(t.ctx, t.request.CollectionName)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
//
|
||||
// var channels []vChan
|
||||
// channels, err = t.chMgr.getVChannels(collID)
|
||||
// if err != nil {
|
||||
// err := t.chMgr.createDMLMsgStream(collID)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// return t.chMgr.getVChannels(collID)
|
||||
// }
|
||||
//
|
||||
// return channels, nil
|
||||
// }
|
||||
|
|
|
@ -30,9 +30,10 @@ import "C"
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"unsafe"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
// SearchPlan is a wrapper of the underlying C-structure C.CSearchPlan
|
||||
|
@ -96,6 +97,7 @@ type searchRequest struct {
|
|||
plan *SearchPlan
|
||||
cPlaceholderGroup C.CPlaceholderGroup
|
||||
timestamp Timestamp
|
||||
msgID UniqueID
|
||||
}
|
||||
|
||||
func newSearchRequest(collection *Collection, req *querypb.SearchRequest, placeholderGrp []byte) (*searchRequest, error) {
|
||||
|
@ -134,6 +136,7 @@ func newSearchRequest(collection *Collection, req *querypb.SearchRequest, placeh
|
|||
plan: plan,
|
||||
cPlaceholderGroup: cPlaceholderGroup,
|
||||
timestamp: req.Req.GetTravelTimestamp(),
|
||||
msgID: req.GetReq().GetReqID(),
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
|
|
|
@ -280,7 +280,8 @@ func (s *Segment) search(searchReq *searchRequest) (*SearchResult, error) {
|
|||
}
|
||||
|
||||
var searchResult SearchResult
|
||||
log.Debug("do search on segment", zap.Int64("segmentID", s.segmentID), zap.String("segmentType", s.segmentType.String()))
|
||||
log.Debug("do search on segment", zap.Int64("msgID", searchReq.msgID),
|
||||
zap.Int64("segmentID", s.segmentID), zap.String("segmentType", s.segmentType.String()))
|
||||
tr := timerecord.NewTimeRecorder("cgoSearch")
|
||||
status := C.Search(s.segmentPtr, searchReq.plan.cSearchPlan, searchReq.cPlaceholderGroup,
|
||||
C.uint64_t(searchReq.timestamp), &searchResult.cSearchResult, C.int64_t(s.segmentID))
|
||||
|
|
|
@ -98,7 +98,8 @@ func (s *searchTask) searchOnStreaming() error {
|
|||
s.QS.collection.RLock() // locks the collectionPtr
|
||||
defer s.QS.collection.RUnlock()
|
||||
if _, released := s.QS.collection.getReleaseTime(); released {
|
||||
log.Debug("collection release before search", zap.Int64("collectionID", s.CollectionID))
|
||||
log.Debug("collection release before search", zap.Int64("msgID", s.ID()),
|
||||
zap.Int64("collectionID", s.CollectionID))
|
||||
return fmt.Errorf("retrieve failed, collection has been released, collectionID = %d", s.CollectionID)
|
||||
}
|
||||
|
||||
|
@ -111,7 +112,8 @@ func (s *searchTask) searchOnStreaming() error {
|
|||
// TODO add context
|
||||
partResults, _, _, sErr := searchStreaming(s.QS.metaReplica, searchReq, s.CollectionID, s.iReq.GetPartitionIDs(), s.req.GetDmlChannel())
|
||||
if sErr != nil {
|
||||
log.Debug("failed to search streaming data", zap.Int64("collectionID", s.CollectionID), zap.Error(sErr))
|
||||
log.Debug("failed to search streaming data", zap.Int64("msgID", s.ID()),
|
||||
zap.Int64("collectionID", s.CollectionID), zap.Error(sErr))
|
||||
return sErr
|
||||
}
|
||||
defer deleteSearchResults(partResults)
|
||||
|
@ -133,7 +135,8 @@ func (s *searchTask) searchOnHistorical() error {
|
|||
s.QS.collection.RLock() // locks the collectionPtr
|
||||
defer s.QS.collection.RUnlock()
|
||||
if _, released := s.QS.collection.getReleaseTime(); released {
|
||||
log.Debug("collection release before search", zap.Int64("collectionID", s.CollectionID))
|
||||
log.Debug("collection release before search", zap.Int64("msgID", s.ID()),
|
||||
zap.Int64("collectionID", s.CollectionID))
|
||||
return fmt.Errorf("retrieve failed, collection has been released, collectionID = %d", s.CollectionID)
|
||||
}
|
||||
|
||||
|
@ -206,17 +209,16 @@ func (s *searchTask) reduceResults(searchReq *searchRequest, results []*SearchRe
|
|||
numSegment := int64(len(results))
|
||||
blobs, err := reduceSearchResultsAndFillData(searchReq.plan, results, numSegment, sInfo.sliceNQs, sInfo.sliceTopKs)
|
||||
if err != nil {
|
||||
log.Debug("marshal for historical results error", zap.Int64("msgID", s.ID()), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
defer deleteSearchResultDataBlobs(blobs)
|
||||
if err != nil {
|
||||
log.Debug("marshal for historical results error", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
for i := 0; i < cnt; i++ {
|
||||
blob, err := getSearchResultDataBlob(blobs, i)
|
||||
if err != nil {
|
||||
log.Debug("getSearchResultDataBlob for historical results error", zap.Error(err))
|
||||
log.Debug("getSearchResultDataBlob for historical results error", zap.Int64("msgID", s.ID()),
|
||||
zap.Error(err))
|
||||
return err
|
||||
}
|
||||
bs := make([]byte, len(blob))
|
||||
|
|
Loading…
Reference in New Issue