Update timestmap according to gracefultime for Bounded Consistency (#17171)

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
pull/17182/head
zhenshan.cao 2022-05-24 12:05:59 +08:00 committed by GitHub
parent 8dc43fa811
commit 2512e668f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 76 additions and 44 deletions

View File

@ -153,7 +153,6 @@ queryCoord:
# Related configuration of queryNode, used to run hybrid search between vector and scalar data.
queryNode:
cacheSize: 32 # GB, default 32 GB, `cacheSize` is the memory used for caching data for faster query. The `cacheSize` must be less than system memory size.
gracefulTime: 0 # Minimum time before the newly inserted data can be searched (in ms)
port: 21123
stats:
@ -291,6 +290,7 @@ common:
retentionDuration: 432000 # 5 days in seconds
entityExpiration: -1 # Entity expiration in seconds, CAUTION make sure entityExpiration >= retentionDuration and -1 means never expire
gracefulTime: 5000 # milliseconds. it represents the interval (in ms) by which the request arrival time needs to be subtracted in the case of Bounded Consistency.
# Default value: auto
# Valid values: [auto, avx512, avx2, avx, sse4_2]

View File

@ -7,7 +7,6 @@ import (
"regexp"
"strings"
"sync"
"time"
"github.com/milvus-io/milvus/internal/parser/planparserv2"
@ -212,20 +211,17 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
if t.request.TravelTimestamp == 0 {
t.TravelTimestamp = t.BeginTs()
} else {
durationSeconds := tsoutil.CalculateDuration(t.BeginTs(), t.request.TravelTimestamp) / 1000
if durationSeconds > Params.CommonCfg.RetentionDuration {
duration := time.Second * time.Duration(durationSeconds)
return fmt.Errorf("only support to travel back to %s so far", duration.String())
}
t.TravelTimestamp = t.request.TravelTimestamp
}
if t.request.GuaranteeTimestamp == 0 {
t.GuaranteeTimestamp = t.BeginTs()
} else {
t.GuaranteeTimestamp = t.request.GuaranteeTimestamp
err = validateTravelTimestamp(t.TravelTimestamp, t.BeginTs())
if err != nil {
return err
}
guaranteeTs := t.request.GetGuaranteeTimestamp()
t.GuaranteeTimestamp = parseGuaranteeTs(guaranteeTs, t.BeginTs())
deadline, ok := t.TraceCtx().Deadline()
if ok {
t.TimeoutTimestamp = tsoutil.ComposeTSByTime(deadline, 0)

View File

@ -7,7 +7,6 @@ import (
"regexp"
"strconv"
"sync"
"time"
"github.com/milvus-io/milvus/internal/parser/planparserv2"
@ -227,22 +226,21 @@ func (t *searchTask) PreExecute(ctx context.Context) error {
log.Debug("Proxy::searchTask::PreExecute", zap.Any("plan.OutputFieldIds", plan.OutputFieldIds),
zap.Any("plan", plan.String()))
}
travelTimestamp := t.request.TravelTimestamp
if travelTimestamp == 0 {
travelTimestamp = typeutil.MaxTimestamp
} else {
durationSeconds := tsoutil.CalculateDuration(t.BeginTs(), travelTimestamp) / 1000
if durationSeconds > Params.CommonCfg.RetentionDuration {
duration := time.Second * time.Duration(durationSeconds)
return fmt.Errorf("only support to travel back to %s so far", duration.String())
}
}
guaranteeTimestamp := t.request.GuaranteeTimestamp
if guaranteeTimestamp == 0 {
guaranteeTimestamp = t.BeginTs()
err = validateTravelTimestamp(travelTimestamp, t.BeginTs())
if err != nil {
return err
}
t.SearchRequest.TravelTimestamp = travelTimestamp
t.SearchRequest.GuaranteeTimestamp = guaranteeTimestamp
guaranteeTs := t.request.GetGuaranteeTimestamp()
guaranteeTs = parseGuaranteeTs(guaranteeTs, t.BeginTs())
t.SearchRequest.GuaranteeTimestamp = guaranteeTs
deadline, ok := t.TraceCtx().Deadline()
if ok {
t.SearchRequest.TimeoutTimestamp = tsoutil.ComposeTSByTime(deadline, 0)

View File

@ -21,11 +21,17 @@ import (
"fmt"
"strconv"
"strings"
"time"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
const strongTS = 0
const boundedTS = 2
// enableMultipleVectorFields indicates whether to enable multiple vector fields.
const enableMultipleVectorFields = false
@ -556,6 +562,26 @@ func ValidatePassword(password string) error {
return nil
}
func validateTravelTimestamp(travelTs, tMax typeutil.Timestamp) error {
durationSeconds := tsoutil.CalculateDuration(tMax, travelTs) / 1000
if durationSeconds > Params.CommonCfg.RetentionDuration {
duration := time.Second * time.Duration(durationSeconds)
return fmt.Errorf("only support to travel back to %s so far", duration.String())
}
return nil
}
func ReplaceID2Name(oldStr string, id int64, name string) string {
return strings.ReplaceAll(oldStr, strconv.FormatInt(id, 10), name)
}
func parseGuaranteeTs(ts, tMax typeutil.Timestamp) typeutil.Timestamp {
switch ts {
case strongTS:
ts = tMax
case boundedTS:
ratio := time.Duration(-Params.CommonCfg.GracefulTime)
ts = tsoutil.AddPhysicalDurationOnTs(tMax, ratio*time.Millisecond)
}
return ts
}

View File

@ -28,8 +28,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
type queryShard struct {
@ -165,10 +163,5 @@ func (q *queryShard) getServiceableTime(tp tsType) (Timestamp, error) {
if err != nil {
return 0, err
}
gracefulTimeInMilliSecond := Params.QueryNodeCfg.GracefulTime
gracefulTime := typeutil.ZeroTimestamp
if gracefulTimeInMilliSecond > 0 {
gracefulTime = tsoutil.ComposeTS(gracefulTimeInMilliSecond, 0)
}
return (ts + gracefulTime), nil
return ts, nil
}

View File

@ -32,6 +32,7 @@ const (
// DefaultIndexSliceSize defines the default slice size of index file when serializing.
DefaultIndexSliceSize = 16
DefaultGracefulTime = 5000 //ms
)
// ComponentParam is used to quickly and easily access all components' configurations.
@ -124,9 +125,11 @@ type commonConfig struct {
RetentionDuration int64
EntityExpirationTTL time.Duration
SimdType string
IndexSliceSize int64
StorageType string
GracefulTime int64
StorageType string
SimdType string
AuthorizationEnabled bool
}
@ -163,6 +166,7 @@ func (p *commonConfig) init(base *BaseTable) {
p.initSimdType()
p.initIndexSliceSize()
p.initGracefulTime()
p.initStorageType()
p.initEnableAuthorization()
@ -365,6 +369,10 @@ func (p *commonConfig) initIndexSliceSize() {
p.IndexSliceSize = p.Base.ParseInt64WithDefault("common.indexSliceSize", DefaultIndexSliceSize)
}
func (p *commonConfig) initGracefulTime() {
p.GracefulTime = p.Base.ParseInt64WithDefault("common.gracefulTime", DefaultGracefulTime)
}
func (p *commonConfig) initStorageType() {
p.StorageType = p.Base.LoadWithDefault("common.storageType", "minio")
}
@ -688,8 +696,7 @@ type queryNodeConfig struct {
// stats
StatsPublishInterval int
GracefulTime int64
SliceIndex int
SliceIndex int
// segcore
ChunkRows int64
@ -717,7 +724,6 @@ func (p *queryNodeConfig) init(base *BaseTable) {
p.Base = base
p.NodeID.Store(UniqueID(0))
p.initCacheSize()
p.initGracefulTime()
p.initFlowGraphMaxQueueLength()
p.initFlowGraphMaxParallelism()
@ -796,11 +802,6 @@ func (p *queryNodeConfig) initSearchResultReceiveBufSize() {
p.SearchResultReceiveBufSize = p.Base.ParseInt64WithDefault("queryNode.msgStream.searchResult.recvBufSize", 64)
}
func (p *queryNodeConfig) initGracefulTime() {
p.GracefulTime = p.Base.ParseInt64("queryNode.gracefulTime")
log.Debug("query node init gracefulTime", zap.Any("gracefulTime", p.GracefulTime))
}
func (p *queryNodeConfig) initSmallIndexParams() {
p.ChunkRows = p.Base.ParseInt64WithDefault("queryNode.segcore.chunkRows", 32768)
if p.ChunkRows < 1024 {

View File

@ -56,6 +56,9 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, Params.IndexSliceSize, int64(DefaultIndexSliceSize))
t.Logf("knowhere index slice size = %d", Params.IndexSliceSize)
assert.Equal(t, Params.GracefulTime, int64(DefaultGracefulTime))
t.Logf("default grafeful time = %d", Params.GracefulTime)
// -- proxy --
assert.Equal(t, Params.ProxySubName, "by-dev-proxy")
t.Logf("ProxySubName: %s", Params.ProxySubName)

View File

@ -75,11 +75,11 @@ func Mod24H(ts uint64) uint64 {
return (physical << logicalBits) | logical
}
// AddPhysicalTimeOnTs adds physical time on ts and return ts
func AddPhysicalTimeOnTs(timeInMs int64, ts uint64) uint64 {
// AddPhysicalDurationOnTs adds physical interval on ts
func AddPhysicalDurationOnTs(ts uint64, duration time.Duration) uint64 {
msecs := duration.Milliseconds()
physical, logical := ParseHybridTs(ts)
return ComposeTS(physical+timeInMs, logical)
return ComposeTS(physical+msecs, logical)
}
// NewTSOKVBase returns a etcdkv.EtcdKV object

View File

@ -63,3 +63,18 @@ func TestCalculateDuration(t *testing.T) {
diff := CalculateDuration(ts2, ts1)
assert.Equal(t, durationInMilliSecs, diff)
}
func TestAddPhysicalDurationOnTs(t *testing.T) {
now := time.Now()
ts1 := ComposeTSByTime(now, 0)
duration := time.Millisecond * (20 * 1000)
ts2 := AddPhysicalDurationOnTs(ts1, duration)
ts3 := ComposeTSByTime(now.Add(duration), 0)
//diff := CalculateDuration(ts2, ts1)
assert.Equal(t, ts3, ts2)
ts2 = AddPhysicalDurationOnTs(ts1, -duration)
ts3 = ComposeTSByTime(now.Add(-duration), 0)
//diff := CalculateDuration(ts2, ts1)
assert.Equal(t, ts3, ts2)
}