mirror of https://github.com/milvus-io/milvus.git
fix: use max MvccTs for iterator (#37247)
issue: #37158 Signed-off-by: Patrick Weizhi Xu <weizhi.xu@zilliz.com>pull/37296/head^2
parent
955219057c
commit
43ad9af529
|
@ -551,7 +551,7 @@ func (t *queryTask) PostExecute(ctx context.Context) error {
|
||||||
|
|
||||||
if t.queryParams.isIterator && t.request.GetGuaranteeTimestamp() == 0 {
|
if t.queryParams.isIterator && t.request.GetGuaranteeTimestamp() == 0 {
|
||||||
// first page for iteration, need to set up sessionTs for iterator
|
// first page for iteration, need to set up sessionTs for iterator
|
||||||
t.result.SessionTs = t.GetGuaranteeTimestamp()
|
t.result.SessionTs = getMaxMvccTsFromChannels(t.channelsMvcc, t.BeginTs())
|
||||||
}
|
}
|
||||||
log.Debug("Query PostExecute done")
|
log.Debug("Query PostExecute done")
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -745,7 +745,7 @@ func (t *searchTask) PostExecute(ctx context.Context) error {
|
||||||
t.result.CollectionName = t.request.GetCollectionName()
|
t.result.CollectionName = t.request.GetCollectionName()
|
||||||
if t.isIterator && t.request.GetGuaranteeTimestamp() == 0 {
|
if t.isIterator && t.request.GetGuaranteeTimestamp() == 0 {
|
||||||
// first page for iteration, need to set up sessionTs for iterator
|
// first page for iteration, need to set up sessionTs for iterator
|
||||||
t.result.SessionTs = t.SearchRequest.GetGuaranteeTimestamp()
|
t.result.SessionTs = getMaxMvccTsFromChannels(t.queryChannelsTs, t.BeginTs())
|
||||||
}
|
}
|
||||||
|
|
||||||
metrics.ProxyReduceResultLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.SearchLabel).Observe(float64(tr.RecordSpan().Milliseconds()))
|
metrics.ProxyReduceResultLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.SearchLabel).Observe(float64(tr.RecordSpan().Milliseconds()))
|
||||||
|
|
|
@ -1001,6 +1001,22 @@ func parseGuaranteeTs(ts, tMax typeutil.Timestamp) typeutil.Timestamp {
|
||||||
return ts
|
return ts
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getMaxMvccTsFromChannels(channelsTs map[string]uint64, beginTs typeutil.Timestamp) typeutil.Timestamp {
|
||||||
|
maxTs := typeutil.Timestamp(0)
|
||||||
|
for _, ts := range channelsTs {
|
||||||
|
if ts > maxTs {
|
||||||
|
maxTs = ts
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if maxTs == 0 {
|
||||||
|
log.Warn("no channel ts found, use beginTs instead")
|
||||||
|
return beginTs
|
||||||
|
}
|
||||||
|
|
||||||
|
return maxTs
|
||||||
|
}
|
||||||
|
|
||||||
func validateName(entity string, nameType string) error {
|
func validateName(entity string, nameType string) error {
|
||||||
entity = strings.TrimSpace(entity)
|
entity = strings.TrimSpace(entity)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue