From 32a51267051a19da7ae04ec012593907b6a9573d Mon Sep 17 00:00:00 2001 From: "zhenshan.cao" Date: Wed, 29 Dec 2021 21:29:45 +0800 Subject: [PATCH] Add the limitation of the traceable interval to timeTravel (#14498) Signed-off-by: zhenshan.cao --- configs/milvus.yaml | 4 +- internal/datacoord/util.go | 2 +- internal/datacoord/util_test.go | 4 +- internal/proxy/impl.go | 40 ++++++-------- internal/proxy/proxy_test.go | 67 ++++++++++++++++++++++++ internal/proxy/task.go | 14 ++++- internal/util/paramtable/global_param.go | 20 +++++-- internal/util/tsoutil/tso.go | 7 +++ internal/util/tsoutil/tso_test.go | 9 ++++ 9 files changed, 132 insertions(+), 35 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index fac138632f..23e84cb60d 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -177,7 +177,6 @@ dataCoord: compaction: enableAutoCompaction: true - retentionDuration: 432000 # 5 days in seconds gc: interval: 3600 # gc interval in seconds @@ -220,7 +219,7 @@ log: msgChannel: # Channel name generation rule: ${namePrefix}-${ChannelIdx} chanNamePrefix: - cluster: "by-dev" + cluster: "by-dev" rootCoordTimeTick: "rootcoord-timetick" rootCoordStatistics: "rootcoord-statistics" rootCoordDml: "rootcoord-dml" @@ -250,6 +249,7 @@ msgChannel: common: defaultPartitionName: "_default" # default partition name for a collection defaultIndexName: "_default_idx" # default index name + retentionDuration: 432000 # 5 days in seconds knowhere: # Default value: auto diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go index 95ba9f93aa..198db08f95 100644 --- a/internal/datacoord/util.go +++ b/internal/datacoord/util.go @@ -73,7 +73,7 @@ func getTimetravelReverseTime(ctx context.Context, allocator allocator) (*timetr } pts, _ := tsoutil.ParseTS(ts) - ttpts := pts.Add(-time.Duration(Params.DataCoordCfg.CompactionRetentionDuration) * time.Second) + ttpts := pts.Add(-time.Duration(Params.DataCoordCfg.RetentionDuration) * time.Second) tt := tsoutil.ComposeTS(ttpts.UnixNano()/int64(time.Millisecond), 0) return &timetravel{tt}, nil } diff --git a/internal/datacoord/util_test.go b/internal/datacoord/util_test.go index 9ef0560abd..cd6fa80cd0 100644 --- a/internal/datacoord/util_test.go +++ b/internal/datacoord/util_test.go @@ -113,10 +113,10 @@ func TestVerifyResponse(t *testing.T) { func Test_getTimetravelReverseTime(t *testing.T) { Params.Init() - Params.DataCoordCfg.CompactionRetentionDuration = 43200 // 5 days + Params.DataCoordCfg.RetentionDuration = 43200 // 5 days tFixed := time.Date(2021, 11, 15, 0, 0, 0, 0, time.Local) - tBefore := tFixed.Add(-time.Duration(Params.DataCoordCfg.CompactionRetentionDuration) * time.Second) + tBefore := tFixed.Add(-time.Duration(Params.DataCoordCfg.RetentionDuration) * time.Second) type args struct { allocator allocator diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index ed7d8da1a1..d4a5e74b7f 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -2341,14 +2341,6 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (* defer sp.Finish() traceID, _, _ := trace.InfoFromSpan(sp) - queryRequest := &milvuspb.QueryRequest{ - DbName: request.DbName, - CollectionName: request.CollectionName, - PartitionNames: request.PartitionNames, - Expr: request.Expr, - OutputFields: request.OutputFields, - } - qt := &queryTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -2360,7 +2352,7 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (* ResultChannelID: strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), }, resultBuf: make(chan []*internalpb.RetrieveResults), - query: queryRequest, + query: request, chMgr: node.chMgr, qc: node.queryCoord, } @@ -2371,9 +2363,9 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (* rpcReceived(method), zap.String("traceID", traceID), zap.String("role", typeutil.ProxyRole), - zap.String("db", queryRequest.DbName), - zap.String("collection", queryRequest.CollectionName), - zap.Any("partitions", queryRequest.PartitionNames)) + zap.String("db", request.DbName), + zap.String("collection", request.CollectionName), + zap.Any("partitions", request.PartitionNames)) if err := node.sched.dqQueue.Enqueue(qt); err != nil { log.Warn( @@ -2381,9 +2373,9 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (* zap.Error(err), zap.String("traceID", traceID), zap.String("role", typeutil.ProxyRole), - zap.String("db", queryRequest.DbName), - zap.String("collection", queryRequest.CollectionName), - zap.Any("partitions", queryRequest.PartitionNames)) + zap.String("db", request.DbName), + zap.String("collection", request.CollectionName), + zap.Any("partitions", request.PartitionNames)) return &milvuspb.QueryResults{ Status: &commonpb.Status{ @@ -2400,9 +2392,9 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (* zap.Int64("MsgID", qt.ID()), zap.Uint64("BeginTs", qt.BeginTs()), zap.Uint64("EndTs", qt.EndTs()), - zap.String("db", queryRequest.DbName), - zap.String("collection", queryRequest.CollectionName), - zap.Any("partitions", queryRequest.PartitionNames)) + zap.String("db", request.DbName), + zap.String("collection", request.CollectionName), + zap.Any("partitions", request.PartitionNames)) if err := qt.WaitToFinish(); err != nil { log.Warn( @@ -2413,9 +2405,9 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (* zap.Int64("MsgID", qt.ID()), zap.Uint64("BeginTs", qt.BeginTs()), zap.Uint64("EndTs", qt.EndTs()), - zap.String("db", queryRequest.DbName), - zap.String("collection", queryRequest.CollectionName), - zap.Any("partitions", queryRequest.PartitionNames)) + zap.String("db", request.DbName), + zap.String("collection", request.CollectionName), + zap.Any("partitions", request.PartitionNames)) return &milvuspb.QueryResults{ Status: &commonpb.Status{ @@ -2432,9 +2424,9 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (* zap.Int64("MsgID", qt.ID()), zap.Uint64("BeginTs", qt.BeginTs()), zap.Uint64("EndTs", qt.EndTs()), - zap.String("db", queryRequest.DbName), - zap.String("collection", queryRequest.CollectionName), - zap.Any("partitions", queryRequest.PartitionNames)) + zap.String("db", request.DbName), + zap.String("collection", request.CollectionName), + zap.Any("partitions", request.PartitionNames)) return &milvuspb.QueryResults{ Status: qt.result.Status, diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index 4565891041..5c201a66c4 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus/internal/util/etcd" "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/internal/util/tsoutil" "go.uber.org/zap" @@ -1256,6 +1257,32 @@ func TestProxy(t *testing.T) { // TODO(dragondriver): compare search result }) + wg.Add(1) + t.Run("search_travel", func(t *testing.T) { + defer wg.Done() + past := time.Now().Add(time.Duration(-1*Params.ProxyCfg.RetentionDuration-100) * time.Second) + travelTs := tsoutil.ComposeTSByTime(past, 0) + req := constructSearchRequest() + req.TravelTimestamp = travelTs + //resp, err := proxy.Search(ctx, req) + res, err := proxy.Search(ctx, req) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, res.Status.ErrorCode) + }) + + wg.Add(1) + t.Run("search_travel_succ", func(t *testing.T) { + defer wg.Done() + past := time.Now().Add(time.Duration(-1*Params.ProxyCfg.RetentionDuration+100) * time.Second) + travelTs := tsoutil.ComposeTSByTime(past, 0) + req := constructSearchRequest() + req.TravelTimestamp = travelTs + //resp, err := proxy.Search(ctx, req) + res, err := proxy.Search(ctx, req) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, res.Status.ErrorCode) + }) + wg.Add(1) t.Run("query", func(t *testing.T) { defer wg.Done() @@ -1275,6 +1302,46 @@ func TestProxy(t *testing.T) { // assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) // TODO(dragondriver): compare query result }) + + wg.Add(1) + t.Run("query_travel", func(t *testing.T) { + defer wg.Done() + past := time.Now().Add(time.Duration(-1*Params.ProxyCfg.RetentionDuration-100) * time.Second) + travelTs := tsoutil.ComposeTSByTime(past, 0) + queryReq := &milvuspb.QueryRequest{ + Base: nil, + DbName: dbName, + CollectionName: collectionName, + Expr: expr, + OutputFields: nil, + PartitionNames: nil, + TravelTimestamp: travelTs, + GuaranteeTimestamp: 0, + } + res, err := proxy.Query(ctx, queryReq) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, res.Status.ErrorCode) + }) + + wg.Add(1) + t.Run("query_travel_succ", func(t *testing.T) { + defer wg.Done() + past := time.Now().Add(time.Duration(-1*Params.ProxyCfg.RetentionDuration+100) * time.Second) + travelTs := tsoutil.ComposeTSByTime(past, 0) + queryReq := &milvuspb.QueryRequest{ + Base: nil, + DbName: dbName, + CollectionName: collectionName, + Expr: expr, + OutputFields: nil, + PartitionNames: nil, + TravelTimestamp: travelTs, + GuaranteeTimestamp: 0, + } + res, err := proxy.Query(ctx, queryReq) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_EmptyCollection, res.Status.ErrorCode) + }) } wg.Add(1) diff --git a/internal/proxy/task.go b/internal/proxy/task.go index 20fde7e4e2..16b8f3b71a 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -28,6 +28,7 @@ import ( "sort" "strconv" "strings" + "time" "unsafe" "github.com/golang/protobuf/proto" @@ -1577,6 +1578,12 @@ func (st *searchTask) PreExecute(ctx context.Context) error { travelTimestamp := st.query.TravelTimestamp if travelTimestamp == 0 { travelTimestamp = st.BeginTs() + } else { + durationSeconds := tsoutil.CalculateDuration(st.BeginTs(), travelTimestamp) / 1000 + if durationSeconds > Params.ProxyCfg.RetentionDuration { + duration := time.Second * time.Duration(durationSeconds) + return fmt.Errorf("only support to travel back to %s so far", duration.String()) + } } guaranteeTimestamp := st.query.GuaranteeTimestamp if guaranteeTimestamp == 0 { @@ -2186,10 +2193,15 @@ func (qt *queryTask) PreExecute(ctx context.Context) error { if err != nil { return err } - travelTimestamp := qt.query.TravelTimestamp if travelTimestamp == 0 { travelTimestamp = qt.BeginTs() + } else { + durationSeconds := tsoutil.CalculateDuration(qt.BeginTs(), travelTimestamp) / 1000 + if durationSeconds > Params.ProxyCfg.RetentionDuration { + duration := time.Second * time.Duration(durationSeconds) + return fmt.Errorf("only support to travel back to %s so far", duration.String()) + } } guaranteeTimestamp := qt.query.GuaranteeTimestamp if guaranteeTimestamp == 0 { diff --git a/internal/util/paramtable/global_param.go b/internal/util/paramtable/global_param.go index 2a553c3202..38ed25cb35 100644 --- a/internal/util/paramtable/global_param.go +++ b/internal/util/paramtable/global_param.go @@ -42,6 +42,9 @@ const ( // SuggestPulsarMaxMessageSize defines the maximum size of Pulsar message. SuggestPulsarMaxMessageSize = 5 * 1024 * 1024 + + // DefaultRetentionDuration defines the default duration for retention which is 5 days in seconds. + DefaultRetentionDuration = 3600 * 24 * 5 ) // GlobalParamTable is a derived struct of BaseParamTable. @@ -489,6 +492,8 @@ type proxyConfig struct { PulsarMaxMessageSize int + RetentionDuration int64 + CreatedTime time.Time UpdatedTime time.Time } @@ -518,6 +523,7 @@ func (p *proxyConfig) init(bp *BaseParamTable) { p.initMaxTaskNum() p.initBufFlagExpireTime() p.initBufFlagCleanupInterval() + p.initRetentionDuration() } // Refresh is called after session init @@ -668,6 +674,10 @@ func (p *proxyConfig) initBufFlagCleanupInterval() { p.BufFlagCleanupInterval = time.Duration(interval) * time.Second } +func (p *proxyConfig) initRetentionDuration() { + p.RetentionDuration = p.BaseParams.ParseInt64WithDefault("common.retentionDuration", DefaultRetentionDuration) +} + /////////////////////////////////////////////////////////////////////////////// // --- querycoord --- type queryCoordConfig struct { @@ -1298,8 +1308,8 @@ type dataCoordConfig struct { EnableCompaction bool EnableGarbageCollection bool - CompactionRetentionDuration int64 - EnableAutoCompaction bool + RetentionDuration int64 + EnableAutoCompaction bool // Garbage Collection GCInterval time.Duration @@ -1342,7 +1352,7 @@ func (p *dataCoordConfig) init(bp *BaseParamTable) { p.initMinioBucketName() p.initMinioRootPath() - p.initCompactionRetentionDuration() + p.initRetentionDuration() p.initEnableAutoCompaction() p.initEnableGarbageCollection() @@ -1556,8 +1566,8 @@ func (p *dataCoordConfig) initMinioRootPath() { p.MinioRootPath = rootPath } -func (p *dataCoordConfig) initCompactionRetentionDuration() { - p.CompactionRetentionDuration = p.BaseParams.ParseInt64WithDefault("dataCoord.compaction.retentionDuration", 432000) +func (p *dataCoordConfig) initRetentionDuration() { + p.RetentionDuration = p.BaseParams.ParseInt64WithDefault("common.retentionDuration", DefaultRetentionDuration) } func (p *dataCoordConfig) initEnableAutoCompaction() { diff --git a/internal/util/tsoutil/tso.go b/internal/util/tsoutil/tso.go index dac6530f8b..8ccfd825d9 100644 --- a/internal/util/tsoutil/tso.go +++ b/internal/util/tsoutil/tso.go @@ -55,6 +55,13 @@ func ParseHybridTs(ts uint64) (int64, int64) { return int64(physical), int64(logical) } +// CalculateDuration returns the number of milliseconds obtained by subtracting ts2 from ts1. +func CalculateDuration(ts1, ts2 typeutil.Timestamp) int64 { + p1, _ := ParseHybridTs(ts1) + p2, _ := ParseHybridTs(ts2) + return p1 - p2 +} + // Mod24H parses the ts to millisecond in one day func Mod24H(ts uint64) uint64 { logical := ts & logicalBitsMask diff --git a/internal/util/tsoutil/tso_test.go b/internal/util/tsoutil/tso_test.go index c6d95cefbc..1b9e9fc550 100644 --- a/internal/util/tsoutil/tso_test.go +++ b/internal/util/tsoutil/tso_test.go @@ -49,3 +49,12 @@ func Test_Tso(t *testing.T) { assert.Equal(t, typeutil.ZeroTimestamp, l) }) } + +func TestCalculateDuration(t *testing.T) { + now := time.Now() + ts1 := ComposeTSByTime(now, 0) + durationInMilliSecs := int64(20 * 1000) + ts2 := ComposeTSByTime(now.Add(time.Duration(durationInMilliSecs)*time.Millisecond), 0) + diff := CalculateDuration(ts2, ts1) + assert.Equal(t, durationInMilliSecs, diff) +}