enhance: Skip timestamp allocation when search/query consistency level is eventually (#29773)

issue: #29772 
1. Skip timestamp allocation when search/query consistency level is
eventually.

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
pull/30718/head
cai.zhang 2024-02-21 09:52:59 +08:00 committed by GitHub
parent 1200bcc65d
commit 40ca98f57f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 582 additions and 43 deletions

View File

@ -87,6 +87,8 @@ type Cache interface {
RemoveDatabase(ctx context.Context, database string)
HasDatabase(ctx context.Context, database string) bool
// AllocID is only using on requests that need to skip timestamp allocation, don't overuse it.
AllocID(ctx context.Context) (int64, error)
}
type collectionBasicInfo struct {
@ -246,6 +248,11 @@ type MetaCache struct {
leaderMut sync.RWMutex
shardMgr shardClientMgr
sfGlobal conc.Singleflight[*collectionInfo]
IDStart int64
IDCount int64
IDIndex int64
IDLock sync.RWMutex
}
// globalMetaCache is singleton instance of Cache
@ -987,3 +994,27 @@ func (m *MetaCache) HasDatabase(ctx context.Context, database string) bool {
_, ok := m.collInfo[database]
return ok
}
func (m *MetaCache) AllocID(ctx context.Context) (int64, error) {
m.IDLock.Lock()
defer m.IDLock.Unlock()
if m.IDIndex == m.IDCount {
resp, err := m.rootCoord.AllocID(ctx, &rootcoordpb.AllocIDRequest{
Count: 1000000,
})
if err != nil {
log.Warn("Refreshing ID cache from rootcoord failed", zap.Error(err))
return 0, err
}
if resp.GetStatus().GetCode() != 0 {
log.Warn("Refreshing ID cache from rootcoord failed", zap.String("failed detail", resp.GetStatus().GetDetail()))
return 0, merr.WrapErrServiceInternal(resp.GetStatus().GetDetail())
}
m.IDStart, m.IDCount = resp.GetID(), int64(resp.GetCount())
m.IDIndex = 0
}
id := m.IDStart + m.IDIndex
m.IDIndex++
return id, nil
}

View File

@ -816,3 +816,68 @@ func TestMetaCache_Database(t *testing.T) {
assert.Equal(t, globalMetaCache.HasDatabase(ctx, dbName), true)
assert.Equal(t, CheckDatabase(ctx, dbName), true)
}
func TestMetaCache_AllocID(t *testing.T) {
ctx := context.Background()
queryCoord := &mocks.MockQueryCoordClient{}
shardMgr := newShardClientMgr()
t.Run("success", func(t *testing.T) {
rootCoord := mocks.NewMockRootCoordClient(t)
rootCoord.EXPECT().AllocID(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocIDResponse{
Status: merr.Status(nil),
ID: 11198,
Count: 10,
}, nil)
rootCoord.EXPECT().ListPolicy(mock.Anything, mock.Anything).Return(&internalpb.ListPolicyResponse{
Status: merr.Success(),
PolicyInfos: []string{"policy1", "policy2", "policy3"},
}, nil)
err := InitMetaCache(ctx, rootCoord, queryCoord, shardMgr)
assert.NoError(t, err)
assert.Equal(t, globalMetaCache.HasDatabase(ctx, dbName), false)
id, err := globalMetaCache.AllocID(ctx)
assert.NoError(t, err)
assert.Equal(t, id, int64(11198))
})
t.Run("error", func(t *testing.T) {
rootCoord := mocks.NewMockRootCoordClient(t)
rootCoord.EXPECT().AllocID(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocIDResponse{
Status: merr.Status(nil),
}, fmt.Errorf("mock error"))
rootCoord.EXPECT().ListPolicy(mock.Anything, mock.Anything).Return(&internalpb.ListPolicyResponse{
Status: merr.Success(),
PolicyInfos: []string{"policy1", "policy2", "policy3"},
}, nil)
err := InitMetaCache(ctx, rootCoord, queryCoord, shardMgr)
assert.NoError(t, err)
assert.Equal(t, globalMetaCache.HasDatabase(ctx, dbName), false)
id, err := globalMetaCache.AllocID(ctx)
assert.Error(t, err)
assert.Equal(t, id, int64(0))
})
t.Run("failed", func(t *testing.T) {
rootCoord := mocks.NewMockRootCoordClient(t)
rootCoord.EXPECT().AllocID(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocIDResponse{
Status: merr.Status(fmt.Errorf("mock failed")),
}, nil)
rootCoord.EXPECT().ListPolicy(mock.Anything, mock.Anything).Return(&internalpb.ListPolicyResponse{
Status: merr.Success(),
PolicyInfos: []string{"policy1", "policy2", "policy3"},
}, nil)
err := InitMetaCache(ctx, rootCoord, queryCoord, shardMgr)
assert.NoError(t, err)
assert.Equal(t, globalMetaCache.HasDatabase(ctx, dbName), false)
id, err := globalMetaCache.AllocID(ctx)
assert.Error(t, err)
assert.Equal(t, id, int64(0))
})
}

View File

@ -24,6 +24,58 @@ func (_m *MockCache) EXPECT() *MockCache_Expecter {
return &MockCache_Expecter{mock: &_m.Mock}
}
// AllocID provides a mock function with given fields: ctx
func (_m *MockCache) AllocID(ctx context.Context) (int64, error) {
ret := _m.Called(ctx)
var r0 int64
var r1 error
if rf, ok := ret.Get(0).(func(context.Context) (int64, error)); ok {
return rf(ctx)
}
if rf, ok := ret.Get(0).(func(context.Context) int64); ok {
r0 = rf(ctx)
} else {
r0 = ret.Get(0).(int64)
}
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
r1 = rf(ctx)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockCache_AllocID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AllocID'
type MockCache_AllocID_Call struct {
*mock.Call
}
// AllocID is a helper method to define mock.On call
// - ctx context.Context
func (_e *MockCache_Expecter) AllocID(ctx interface{}) *MockCache_AllocID_Call {
return &MockCache_AllocID_Call{Call: _e.mock.On("AllocID", ctx)}
}
func (_c *MockCache_AllocID_Call) Run(run func(ctx context.Context)) *MockCache_AllocID_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context))
})
return _c
}
func (_c *MockCache_AllocID_Call) Return(_a0 int64, _a1 error) *MockCache_AllocID_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockCache_AllocID_Call) RunAndReturn(run func(context.Context) (int64, error)) *MockCache_AllocID_Call {
_c.Call.Return(run)
return _c
}
// DeprecateShardCache provides a mock function with given fields: database, collectionName
func (_m *MockCache) DeprecateShardCache(database string, collectionName string) {
_m.Called(database, collectionName)
@ -823,41 +875,6 @@ func (_c *MockCache_RefreshPolicyInfo_Call) RunAndReturn(run func(typeutil.Cache
return _c
}
// RemoveAlias provides a mock function with given fields: ctx, database, alias
func (_m *MockCache) RemoveAlias(ctx context.Context, database string, alias string) {
_m.Called(ctx, database, alias)
}
// MockCache_RemoveAlias_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveAlias'
type MockCache_RemoveAlias_Call struct {
*mock.Call
}
// RemoveAlias is a helper method to define mock.On call
// - ctx context.Context
// - database string
// - alias string
func (_e *MockCache_Expecter) RemoveAlias(ctx interface{}, database interface{}, alias interface{}) *MockCache_RemoveAlias_Call {
return &MockCache_RemoveAlias_Call{Call: _e.mock.On("RemoveAlias", ctx, database, alias)}
}
func (_c *MockCache_RemoveAlias_Call) Run(run func(ctx context.Context, database string, alias string)) *MockCache_RemoveAlias_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string), args[2].(string))
})
return _c
}
func (_c *MockCache_RemoveAlias_Call) Return() *MockCache_RemoveAlias_Call {
_c.Call.Return()
return _c
}
func (_c *MockCache_RemoveAlias_Call) RunAndReturn(run func(context.Context, string, string)) *MockCache_RemoveAlias_Call {
_c.Call.Return(run)
return _c
}
// RemoveCollection provides a mock function with given fields: ctx, database, collectionName
func (_m *MockCache) RemoveCollection(ctx context.Context, database string, collectionName string) {
_m.Called(ctx, database, collectionName)

View File

@ -103,6 +103,10 @@ type mockTask struct {
ts Timestamp
}
func (m *mockTask) CanSkipAllocTimestamp() bool {
return false
}
func (m *mockTask) TraceCtx() context.Context {
return m.TaskCondition.ctx
}

View File

@ -112,6 +112,13 @@ type task interface {
PostExecute(ctx context.Context) error
WaitToFinish() error
Notify(err error)
CanSkipAllocTimestamp() bool
}
type baseTask struct{}
func (bt *baseTask) CanSkipAllocTimestamp() bool {
return false
}
type dmlTask interface {
@ -123,6 +130,7 @@ type dmlTask interface {
type BaseInsertTask = msgstream.InsertMsg
type createCollectionTask struct {
baseTask
Condition
*milvuspb.CreateCollectionRequest
ctx context.Context
@ -361,6 +369,7 @@ func (t *createCollectionTask) PostExecute(ctx context.Context) error {
}
type dropCollectionTask struct {
baseTask
Condition
*milvuspb.DropCollectionRequest
ctx context.Context
@ -430,6 +439,7 @@ func (t *dropCollectionTask) PostExecute(ctx context.Context) error {
}
type hasCollectionTask struct {
baseTask
Condition
*milvuspb.HasCollectionRequest
ctx context.Context
@ -504,6 +514,7 @@ func (t *hasCollectionTask) PostExecute(ctx context.Context) error {
}
type describeCollectionTask struct {
baseTask
Condition
*milvuspb.DescribeCollectionRequest
ctx context.Context
@ -639,6 +650,7 @@ func (t *describeCollectionTask) PostExecute(ctx context.Context) error {
}
type showCollectionsTask struct {
baseTask
Condition
*milvuspb.ShowCollectionsRequest
ctx context.Context
@ -798,6 +810,7 @@ func (t *showCollectionsTask) PostExecute(ctx context.Context) error {
}
type alterCollectionTask struct {
baseTask
Condition
*milvuspb.AlterCollectionRequest
ctx context.Context
@ -882,6 +895,7 @@ func (t *alterCollectionTask) PostExecute(ctx context.Context) error {
}
type createPartitionTask struct {
baseTask
Condition
*milvuspb.CreatePartitionRequest
ctx context.Context
@ -969,6 +983,7 @@ func (t *createPartitionTask) PostExecute(ctx context.Context) error {
}
type dropPartitionTask struct {
baseTask
Condition
*milvuspb.DropPartitionRequest
ctx context.Context
@ -1083,6 +1098,7 @@ func (t *dropPartitionTask) PostExecute(ctx context.Context) error {
}
type hasPartitionTask struct {
baseTask
Condition
*milvuspb.HasPartitionRequest
ctx context.Context
@ -1159,6 +1175,7 @@ func (t *hasPartitionTask) PostExecute(ctx context.Context) error {
}
type showPartitionsTask struct {
baseTask
Condition
*milvuspb.ShowPartitionsRequest
ctx context.Context
@ -1321,6 +1338,7 @@ func (t *showPartitionsTask) PostExecute(ctx context.Context) error {
}
type flushTask struct {
baseTask
Condition
*milvuspb.FlushRequest
ctx context.Context
@ -1422,6 +1440,7 @@ func (t *flushTask) PostExecute(ctx context.Context) error {
}
type loadCollectionTask struct {
baseTask
Condition
*milvuspb.LoadCollectionRequest
ctx context.Context
@ -1579,6 +1598,7 @@ func (t *loadCollectionTask) PostExecute(ctx context.Context) error {
}
type releaseCollectionTask struct {
baseTask
Condition
*milvuspb.ReleaseCollectionRequest
ctx context.Context
@ -1671,6 +1691,7 @@ func (t *releaseCollectionTask) PostExecute(ctx context.Context) error {
}
type loadPartitionsTask struct {
baseTask
Condition
*milvuspb.LoadPartitionsRequest
ctx context.Context
@ -1821,6 +1842,7 @@ func (t *loadPartitionsTask) PostExecute(ctx context.Context) error {
}
type releasePartitionsTask struct {
baseTask
Condition
*milvuspb.ReleasePartitionsRequest
ctx context.Context
@ -1928,6 +1950,7 @@ func (t *releasePartitionsTask) PostExecute(ctx context.Context) error {
}
type CreateResourceGroupTask struct {
baseTask
Condition
*milvuspb.CreateResourceGroupRequest
ctx context.Context
@ -1992,6 +2015,7 @@ func (t *CreateResourceGroupTask) PostExecute(ctx context.Context) error {
}
type DropResourceGroupTask struct {
baseTask
Condition
*milvuspb.DropResourceGroupRequest
ctx context.Context
@ -2056,6 +2080,7 @@ func (t *DropResourceGroupTask) PostExecute(ctx context.Context) error {
}
type DescribeResourceGroupTask struct {
baseTask
Condition
*milvuspb.DescribeResourceGroupRequest
ctx context.Context
@ -2177,6 +2202,7 @@ func (t *DescribeResourceGroupTask) PostExecute(ctx context.Context) error {
}
type TransferNodeTask struct {
baseTask
Condition
*milvuspb.TransferNodeRequest
ctx context.Context
@ -2241,6 +2267,7 @@ func (t *TransferNodeTask) PostExecute(ctx context.Context) error {
}
type TransferReplicaTask struct {
baseTask
Condition
*milvuspb.TransferReplicaRequest
ctx context.Context
@ -2314,6 +2341,7 @@ func (t *TransferReplicaTask) PostExecute(ctx context.Context) error {
}
type ListResourceGroupsTask struct {
baseTask
Condition
*milvuspb.ListResourceGroupsRequest
ctx context.Context

View File

@ -28,6 +28,7 @@ import (
// CreateAliasTask contains task information of CreateAlias
type CreateAliasTask struct {
baseTask
Condition
*milvuspb.CreateAliasRequest
ctx context.Context
@ -115,6 +116,7 @@ func (t *CreateAliasTask) PostExecute(ctx context.Context) error {
// DropAliasTask is the task to drop alias
type DropAliasTask struct {
baseTask
Condition
*milvuspb.DropAliasRequest
ctx context.Context
@ -187,6 +189,7 @@ func (t *DropAliasTask) PostExecute(ctx context.Context) error {
// AlterAliasTask is the task to alter alias
type AlterAliasTask struct {
baseTask
Condition
*milvuspb.AlterAliasRequest
ctx context.Context
@ -263,6 +266,7 @@ func (t *AlterAliasTask) PostExecute(ctx context.Context) error {
// DescribeAliasTask is the task to describe alias
type DescribeAliasTask struct {
baseTask
Condition
nodeID UniqueID
*milvuspb.DescribeAliasRequest
@ -330,6 +334,7 @@ func (a *DescribeAliasTask) PostExecute(ctx context.Context) error {
// ListAliasesTask is the task to list aliases
type ListAliasesTask struct {
baseTask
Condition
nodeID UniqueID
*milvuspb.ListAliasesRequest

View File

@ -12,6 +12,7 @@ import (
)
type createDatabaseTask struct {
baseTask
Condition
*milvuspb.CreateDatabaseRequest
ctx context.Context
@ -80,6 +81,7 @@ func (cdt *createDatabaseTask) PostExecute(ctx context.Context) error {
}
type dropDatabaseTask struct {
baseTask
Condition
*milvuspb.DropDatabaseRequest
ctx context.Context
@ -150,6 +152,7 @@ func (ddt *dropDatabaseTask) PostExecute(ctx context.Context) error {
}
type listDatabaseTask struct {
baseTask
Condition
*milvuspb.ListDatabasesRequest
ctx context.Context

View File

@ -34,6 +34,7 @@ import (
type BaseDeleteTask = msgstream.DeleteMsg
type deleteTask struct {
baseTask
Condition
ctx context.Context
tr *timerecord.TimeRecorder

View File

@ -31,6 +31,7 @@ const (
)
type hybridSearchTask struct {
baseTask
Condition
ctx context.Context
*internalpb.HybridSearchRequest

View File

@ -53,6 +53,7 @@ const (
)
type createIndexTask struct {
baseTask
Condition
req *milvuspb.CreateIndexRequest
ctx context.Context
@ -435,6 +436,7 @@ func (cit *createIndexTask) PostExecute(ctx context.Context) error {
}
type alterIndexTask struct {
baseTask
Condition
req *milvuspb.AlterIndexRequest
ctx context.Context
@ -554,6 +556,7 @@ func (t *alterIndexTask) PostExecute(ctx context.Context) error {
}
type describeIndexTask struct {
baseTask
Condition
*milvuspb.DescribeIndexRequest
ctx context.Context
@ -677,6 +680,7 @@ func (dit *describeIndexTask) PostExecute(ctx context.Context) error {
}
type getIndexStatisticsTask struct {
baseTask
Condition
*milvuspb.GetIndexStatisticsRequest
ctx context.Context
@ -793,6 +797,7 @@ func (dit *getIndexStatisticsTask) PostExecute(ctx context.Context) error {
}
type dropIndexTask struct {
baseTask
Condition
ctx context.Context
*milvuspb.DropIndexRequest
@ -913,6 +918,7 @@ func (dit *dropIndexTask) PostExecute(ctx context.Context) error {
// Deprecated: use describeIndexTask instead
type getIndexBuildProgressTask struct {
baseTask
Condition
*milvuspb.GetIndexBuildProgressRequest
ctx context.Context
@ -1002,6 +1008,7 @@ func (gibpt *getIndexBuildProgressTask) PostExecute(ctx context.Context) error {
// Deprecated: use describeIndexTask instead
type getIndexStateTask struct {
baseTask
Condition
*milvuspb.GetIndexStateRequest
ctx context.Context

View File

@ -22,6 +22,7 @@ import (
)
type insertTask struct {
baseTask
// req *milvuspb.InsertRequest
Condition
insertMsg *BaseInsertTask

View File

@ -242,6 +242,31 @@ func (t *queryTask) createPlan(ctx context.Context) error {
return nil
}
func (t *queryTask) CanSkipAllocTimestamp() bool {
var consistencyLevel commonpb.ConsistencyLevel
useDefaultConsistency := t.request.GetUseDefaultConsistency()
if !useDefaultConsistency {
consistencyLevel = t.request.GetConsistencyLevel()
} else {
collID, err := globalMetaCache.GetCollectionID(context.Background(), t.request.GetDbName(), t.request.GetCollectionName())
if err != nil { // err is not nil if collection not exists
log.Warn("query task get collectionID failed, can't skip alloc timestamp",
zap.String("collectionName", t.request.GetCollectionName()), zap.Error(err))
return false
}
collectionInfo, err2 := globalMetaCache.GetCollectionInfo(context.Background(), t.request.GetDbName(), t.request.GetCollectionName(), collID)
if err2 != nil {
log.Warn("query task get collection info failed, can't skip alloc timestamp",
zap.String("collectionName", t.request.GetCollectionName()), zap.Error(err))
return false
}
consistencyLevel = collectionInfo.consistencyLevel
}
return consistencyLevel != commonpb.ConsistencyLevel_Strong
}
func (t *queryTask) PreExecute(ctx context.Context) error {
t.Base.MsgType = commonpb.MsgType_Retrieve
t.Base.SourceID = paramtable.GetNodeID()
@ -367,7 +392,6 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
t.RetrieveRequest.Username = username
}
t.MvccTimestamp = t.BeginTs()
collectionInfo, err2 := globalMetaCache.GetCollectionInfo(ctx, t.request.GetDbName(), collectionName, t.CollectionID)
if err2 != nil {
log.Warn("Proxy::queryTask::PreExecute failed to GetCollectionInfo from cache",
@ -662,6 +686,9 @@ func (t *queryTask) EndTs() Timestamp {
}
func (t *queryTask) SetTs(ts Timestamp) {
if t.reQuery && t.Base.Timestamp != 0 {
return
}
t.Base.Timestamp = ts
}

View File

@ -957,3 +957,120 @@ func TestQueryTask_IDs2Expr(t *testing.T) {
expectStrExpr := "pk in [ \"a\", \"b\", \"c\" ]"
assert.Equal(t, expectStrExpr, strExpr)
}
func TestQueryTask_CanSkipAllocTimestamp(t *testing.T) {
dbName := "test_query"
collName := "test_skip_alloc_timestamp"
collID := UniqueID(111)
mockMetaCache := NewMockCache(t)
globalMetaCache = mockMetaCache
t.Run("default consistency level", func(t *testing.T) {
qt := &queryTask{
request: &milvuspb.QueryRequest{
Base: nil,
DbName: dbName,
CollectionName: collName,
UseDefaultConsistency: true,
},
}
mockMetaCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(collID, nil)
mockMetaCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
&collectionBasicInfo{
collID: collID,
consistencyLevel: commonpb.ConsistencyLevel_Eventually,
}, nil).Once()
skip := qt.CanSkipAllocTimestamp()
assert.True(t, skip)
mockMetaCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
&collectionBasicInfo{
collID: collID,
consistencyLevel: commonpb.ConsistencyLevel_Bounded,
}, nil).Once()
skip = qt.CanSkipAllocTimestamp()
assert.True(t, skip)
mockMetaCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
&collectionBasicInfo{
collID: collID,
consistencyLevel: commonpb.ConsistencyLevel_Strong,
}, nil).Once()
skip = qt.CanSkipAllocTimestamp()
assert.False(t, skip)
})
t.Run("request consistency level", func(t *testing.T) {
mockMetaCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
&collectionBasicInfo{
collID: collID,
consistencyLevel: commonpb.ConsistencyLevel_Eventually,
}, nil).Times(3)
qt := &queryTask{
request: &milvuspb.QueryRequest{
Base: nil,
DbName: dbName,
CollectionName: collName,
UseDefaultConsistency: false,
ConsistencyLevel: commonpb.ConsistencyLevel_Eventually,
},
}
skip := qt.CanSkipAllocTimestamp()
assert.True(t, skip)
qt.request.ConsistencyLevel = commonpb.ConsistencyLevel_Bounded
skip = qt.CanSkipAllocTimestamp()
assert.True(t, skip)
qt.request.ConsistencyLevel = commonpb.ConsistencyLevel_Strong
skip = qt.CanSkipAllocTimestamp()
assert.False(t, skip)
})
t.Run("failed", func(t *testing.T) {
mockMetaCache.ExpectedCalls = nil
mockMetaCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(collID, nil)
mockMetaCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
nil, fmt.Errorf("mock error")).Once()
qt := &queryTask{
request: &milvuspb.QueryRequest{
Base: nil,
DbName: dbName,
CollectionName: collName,
UseDefaultConsistency: true,
ConsistencyLevel: commonpb.ConsistencyLevel_Eventually,
},
}
skip := qt.CanSkipAllocTimestamp()
assert.False(t, skip)
mockMetaCache.ExpectedCalls = nil
mockMetaCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(collID, fmt.Errorf("mock error"))
mockMetaCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
&collectionBasicInfo{
collID: collID,
consistencyLevel: commonpb.ConsistencyLevel_Eventually,
}, nil)
skip = qt.CanSkipAllocTimestamp()
assert.False(t, skip)
qt2 := &queryTask{
request: &milvuspb.QueryRequest{
Base: nil,
DbName: dbName,
CollectionName: collName,
UseDefaultConsistency: false,
ConsistencyLevel: commonpb.ConsistencyLevel_Eventually,
},
}
skip = qt2.CanSkipAllocTimestamp()
assert.True(t, skip)
})
}

View File

@ -30,6 +30,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -170,14 +171,24 @@ func (queue *baseTaskQueue) Enqueue(t task) error {
return err
}
ts, err := queue.tsoAllocatorIns.AllocOne(t.TraceCtx())
if err != nil {
return err
var ts Timestamp
var id UniqueID
if t.CanSkipAllocTimestamp() {
ts = tsoutil.ComposeTS(time.Now().UnixMilli(), 0)
id, err = globalMetaCache.AllocID(t.TraceCtx())
if err != nil {
return err
}
} else {
ts, err = queue.tsoAllocatorIns.AllocOne(t.TraceCtx())
if err != nil {
return err
}
// we always use same msg id and ts for now.
id = UniqueID(ts)
}
t.SetTs(ts)
// we always use same msg id and ts for now.
t.SetID(UniqueID(ts))
t.SetID(id)
return queue.addUnissuedTask(t)
}

View File

@ -28,7 +28,9 @@ import (
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/funcutil"
)
@ -602,3 +604,75 @@ func TestTaskScheduler_concurrentPushAndPop(t *testing.T) {
}
wg.Wait()
}
func TestTaskScheduler_SkipAllocTimestamp(t *testing.T) {
dbName := "test_query"
collName := "test_skip_alloc_timestamp"
collID := UniqueID(111)
mockMetaCache := NewMockCache(t)
globalMetaCache = mockMetaCache
tsoAllocatorIns := newMockTsoAllocator()
queue := newBaseTaskQueue(tsoAllocatorIns)
assert.NotNil(t, queue)
assert.True(t, queue.utEmpty())
assert.False(t, queue.utFull())
mockMetaCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(collID, nil)
mockMetaCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
&collectionBasicInfo{
collID: collID,
consistencyLevel: commonpb.ConsistencyLevel_Eventually,
}, nil)
mockMetaCache.EXPECT().AllocID(mock.Anything).Return(1, nil).Twice()
t.Run("query", func(t *testing.T) {
qt := &queryTask{
RetrieveRequest: &internalpb.RetrieveRequest{
Base: &commonpb.MsgBase{},
},
request: &milvuspb.QueryRequest{
DbName: dbName,
CollectionName: collName,
UseDefaultConsistency: true,
},
}
err := queue.Enqueue(qt)
assert.NoError(t, err)
})
t.Run("search", func(t *testing.T) {
st := &searchTask{
SearchRequest: &internalpb.SearchRequest{
Base: &commonpb.MsgBase{},
},
request: &milvuspb.SearchRequest{
DbName: dbName,
CollectionName: collName,
UseDefaultConsistency: true,
},
}
err := queue.Enqueue(st)
assert.NoError(t, err)
})
mockMetaCache.EXPECT().AllocID(mock.Anything).Return(0, fmt.Errorf("mock error")).Once()
t.Run("failed", func(t *testing.T) {
st := &searchTask{
SearchRequest: &internalpb.SearchRequest{
Base: &commonpb.MsgBase{},
},
request: &milvuspb.SearchRequest{
DbName: dbName,
CollectionName: collName,
UseDefaultConsistency: true,
},
}
err := queue.Enqueue(st)
assert.Error(t, err)
})
}

View File

@ -236,6 +236,31 @@ func getNq(req *milvuspb.SearchRequest) (int64, error) {
return req.GetNq(), nil
}
func (t *searchTask) CanSkipAllocTimestamp() bool {
var consistencyLevel commonpb.ConsistencyLevel
useDefaultConsistency := t.request.GetUseDefaultConsistency()
if !useDefaultConsistency {
consistencyLevel = t.request.GetConsistencyLevel()
} else {
collID, err := globalMetaCache.GetCollectionID(context.Background(), t.request.GetDbName(), t.request.GetCollectionName())
if err != nil { // err is not nil if collection not exists
log.Warn("search task get collectionID failed, can't skip alloc timestamp",
zap.String("collectionName", t.request.GetCollectionName()), zap.Error(err))
return false
}
collectionInfo, err2 := globalMetaCache.GetCollectionInfo(context.Background(), t.request.GetDbName(), t.request.GetCollectionName(), collID)
if err2 != nil {
log.Warn("search task get collection info failed, can't skip alloc timestamp",
zap.String("collectionName", t.request.GetCollectionName()), zap.Error(err))
return false
}
consistencyLevel = collectionInfo.consistencyLevel
}
return consistencyLevel != commonpb.ConsistencyLevel_Strong
}
func (t *searchTask) PreExecute(ctx context.Context) error {
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Search-PreExecute")
defer sp.End()
@ -499,7 +524,8 @@ func (t *searchTask) estimateResultSize(nq int64, topK int64) (int64, error) {
func (t *searchTask) Requery() error {
queryReq := &milvuspb.QueryRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Retrieve,
MsgType: commonpb.MsgType_Retrieve,
Timestamp: t.BeginTs(),
},
DbName: t.request.GetDbName(),
CollectionName: t.request.GetCollectionName(),

View File

@ -2308,3 +2308,120 @@ func (s *GetPartitionIDsSuite) TestRegexpPartitionNames() {
func TestGetPartitionIDs(t *testing.T) {
suite.Run(t, new(GetPartitionIDsSuite))
}
func TestSearchTask_CanSkipAllocTimestamp(t *testing.T) {
dbName := "test_query"
collName := "test_skip_alloc_timestamp"
collID := UniqueID(111)
mockMetaCache := NewMockCache(t)
globalMetaCache = mockMetaCache
t.Run("default consistency level", func(t *testing.T) {
st := &searchTask{
request: &milvuspb.SearchRequest{
Base: nil,
DbName: dbName,
CollectionName: collName,
UseDefaultConsistency: true,
},
}
mockMetaCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(collID, nil)
mockMetaCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
&collectionBasicInfo{
collID: collID,
consistencyLevel: commonpb.ConsistencyLevel_Eventually,
}, nil).Once()
skip := st.CanSkipAllocTimestamp()
assert.True(t, skip)
mockMetaCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
&collectionBasicInfo{
collID: collID,
consistencyLevel: commonpb.ConsistencyLevel_Bounded,
}, nil).Once()
skip = st.CanSkipAllocTimestamp()
assert.True(t, skip)
mockMetaCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
&collectionBasicInfo{
collID: collID,
consistencyLevel: commonpb.ConsistencyLevel_Strong,
}, nil).Once()
skip = st.CanSkipAllocTimestamp()
assert.False(t, skip)
})
t.Run("request consistency level", func(t *testing.T) {
mockMetaCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
&collectionBasicInfo{
collID: collID,
consistencyLevel: commonpb.ConsistencyLevel_Eventually,
}, nil).Times(3)
st := &searchTask{
request: &milvuspb.SearchRequest{
Base: nil,
DbName: dbName,
CollectionName: collName,
UseDefaultConsistency: false,
ConsistencyLevel: commonpb.ConsistencyLevel_Eventually,
},
}
skip := st.CanSkipAllocTimestamp()
assert.True(t, skip)
st.request.ConsistencyLevel = commonpb.ConsistencyLevel_Bounded
skip = st.CanSkipAllocTimestamp()
assert.True(t, skip)
st.request.ConsistencyLevel = commonpb.ConsistencyLevel_Strong
skip = st.CanSkipAllocTimestamp()
assert.False(t, skip)
})
t.Run("failed", func(t *testing.T) {
mockMetaCache.ExpectedCalls = nil
mockMetaCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(collID, nil)
mockMetaCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
nil, fmt.Errorf("mock error")).Once()
st := &searchTask{
request: &milvuspb.SearchRequest{
Base: nil,
DbName: dbName,
CollectionName: collName,
UseDefaultConsistency: true,
ConsistencyLevel: commonpb.ConsistencyLevel_Eventually,
},
}
skip := st.CanSkipAllocTimestamp()
assert.False(t, skip)
mockMetaCache.ExpectedCalls = nil
mockMetaCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(collID, fmt.Errorf("mock error"))
mockMetaCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
&collectionBasicInfo{
collID: collID,
consistencyLevel: commonpb.ConsistencyLevel_Eventually,
}, nil)
skip = st.CanSkipAllocTimestamp()
assert.False(t, skip)
st2 := &searchTask{
request: &milvuspb.SearchRequest{
Base: nil,
DbName: dbName,
CollectionName: collName,
UseDefaultConsistency: false,
ConsistencyLevel: commonpb.ConsistencyLevel_Eventually,
},
}
skip = st2.CanSkipAllocTimestamp()
assert.True(t, skip)
})
}

View File

@ -34,6 +34,7 @@ const (
type getStatisticsTask struct {
request *milvuspb.GetStatisticsRequest
result *milvuspb.GetStatisticsResponse
baseTask
Condition
collectionName string
partitionNames []string
@ -590,6 +591,7 @@ func reduceStatisticResponse(results []map[string]string) ([]*commonpb.KeyValueP
// old version of get statistics
// please remove it after getStatisticsTask below is stable
type getCollectionStatisticsTask struct {
baseTask
Condition
*milvuspb.GetCollectionStatisticsRequest
ctx context.Context
@ -675,6 +677,7 @@ func (g *getCollectionStatisticsTask) PostExecute(ctx context.Context) error {
}
type getPartitionStatisticsTask struct {
baseTask
Condition
*milvuspb.GetPartitionStatisticsRequest
ctx context.Context

View File

@ -41,6 +41,7 @@ import (
)
type upsertTask struct {
baseTask
Condition
upsertMsg *msgstream.UpsertMsg