enhance: generate guaranteets at delegator if local wal (#39799)

issue: #38399, #39892

- use mvcc timestamp of wal as guaranteets if wal and delegator is
located at same node.
- fix: ignore growing option is lost at hibridsearch

---------

Signed-off-by: chyezh <chyezh@outlook.com>
pull/39507/head
Zhen Ye 2025-02-17 15:22:15 +08:00 committed by GitHub
parent 68346ee2b5
commit 21724ab52c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 1169 additions and 564 deletions

View File

@ -84,6 +84,10 @@ type WALAccesser interface {
// WALName returns the name of the wal.
WALName() string
// GetLatestMVCCTimestampIfLocal gets the latest mvcc timestamp of the vchannel.
// If the wal is located at remote, it will return 0, error.
GetLatestMVCCTimestampIfLocal(ctx context.Context, vchannel string) (uint64, error)
// Txn returns a transaction for writing records to one vchannel.
// It promises the atomicity written of the messages.
// Once the txn is returned, the Commit or Rollback operation must be called once, otherwise resource leak on wal.

View File

@ -64,6 +64,15 @@ func (w *walAccesserImpl) WALName() string {
return util.MustSelectWALName()
}
func (w *walAccesserImpl) GetLatestMVCCTimestampIfLocal(ctx context.Context, vchannel string) (uint64, error) {
if !w.lifetime.Add(typeutil.LifetimeStateWorking) {
return 0, ErrWALAccesserClosed
}
defer w.lifetime.Done()
return w.handlerClient.GetLatestMVCCTimestampIfLocal(ctx, vchannel)
}
// RawAppend writes a record to the log.
func (w *walAccesserImpl) RawAppend(ctx context.Context, msg message.MutableMessage, opts ...AppendOption) (*types.AppendResult, error) {
assertValidMessage(msg)

View File

@ -147,6 +147,8 @@ func TestWAL(t *testing.T) {
w.Close()
w.GetLatestMVCCTimestampIfLocal(ctx, vChannel1)
resp = w.AppendMessages(ctx, newInsertMessage(vChannel1))
assert.Error(t, resp.UnwrapFirstError())

View File

@ -196,6 +196,63 @@ func (_c *MockWALAccesser_Broadcast_Call) RunAndReturn(run func() streaming.Broa
return _c
}
// GetLatestMVCCTimestampIfLocal provides a mock function with given fields: ctx, vchannel
func (_m *MockWALAccesser) GetLatestMVCCTimestampIfLocal(ctx context.Context, vchannel string) (uint64, error) {
ret := _m.Called(ctx, vchannel)
if len(ret) == 0 {
panic("no return value specified for GetLatestMVCCTimestampIfLocal")
}
var r0 uint64
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, string) (uint64, error)); ok {
return rf(ctx, vchannel)
}
if rf, ok := ret.Get(0).(func(context.Context, string) uint64); ok {
r0 = rf(ctx, vchannel)
} else {
r0 = ret.Get(0).(uint64)
}
if rf, ok := ret.Get(1).(func(context.Context, string) error); ok {
r1 = rf(ctx, vchannel)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockWALAccesser_GetLatestMVCCTimestampIfLocal_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLatestMVCCTimestampIfLocal'
type MockWALAccesser_GetLatestMVCCTimestampIfLocal_Call struct {
*mock.Call
}
// GetLatestMVCCTimestampIfLocal is a helper method to define mock.On call
// - ctx context.Context
// - vchannel string
func (_e *MockWALAccesser_Expecter) GetLatestMVCCTimestampIfLocal(ctx interface{}, vchannel interface{}) *MockWALAccesser_GetLatestMVCCTimestampIfLocal_Call {
return &MockWALAccesser_GetLatestMVCCTimestampIfLocal_Call{Call: _e.mock.On("GetLatestMVCCTimestampIfLocal", ctx, vchannel)}
}
func (_c *MockWALAccesser_GetLatestMVCCTimestampIfLocal_Call) Run(run func(ctx context.Context, vchannel string)) *MockWALAccesser_GetLatestMVCCTimestampIfLocal_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string))
})
return _c
}
func (_c *MockWALAccesser_GetLatestMVCCTimestampIfLocal_Call) Return(_a0 uint64, _a1 error) *MockWALAccesser_GetLatestMVCCTimestampIfLocal_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockWALAccesser_GetLatestMVCCTimestampIfLocal_Call) RunAndReturn(run func(context.Context, string) (uint64, error)) *MockWALAccesser_GetLatestMVCCTimestampIfLocal_Call {
_c.Call.Return(run)
return _c
}
// RawAppend provides a mock function with given fields: ctx, msgs, opts
func (_m *MockWALAccesser) RawAppend(ctx context.Context, msgs message.MutableMessage, opts ...streaming.AppendOption) (*types.AppendResult, error) {
_va := make([]interface{}, len(opts))

View File

@ -177,6 +177,63 @@ func (_c *MockHandlerClient_CreateProducer_Call) RunAndReturn(run func(context.C
return _c
}
// GetLatestMVCCTimestampIfLocal provides a mock function with given fields: ctx, vchannel
func (_m *MockHandlerClient) GetLatestMVCCTimestampIfLocal(ctx context.Context, vchannel string) (uint64, error) {
ret := _m.Called(ctx, vchannel)
if len(ret) == 0 {
panic("no return value specified for GetLatestMVCCTimestampIfLocal")
}
var r0 uint64
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, string) (uint64, error)); ok {
return rf(ctx, vchannel)
}
if rf, ok := ret.Get(0).(func(context.Context, string) uint64); ok {
r0 = rf(ctx, vchannel)
} else {
r0 = ret.Get(0).(uint64)
}
if rf, ok := ret.Get(1).(func(context.Context, string) error); ok {
r1 = rf(ctx, vchannel)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockHandlerClient_GetLatestMVCCTimestampIfLocal_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLatestMVCCTimestampIfLocal'
type MockHandlerClient_GetLatestMVCCTimestampIfLocal_Call struct {
*mock.Call
}
// GetLatestMVCCTimestampIfLocal is a helper method to define mock.On call
// - ctx context.Context
// - vchannel string
func (_e *MockHandlerClient_Expecter) GetLatestMVCCTimestampIfLocal(ctx interface{}, vchannel interface{}) *MockHandlerClient_GetLatestMVCCTimestampIfLocal_Call {
return &MockHandlerClient_GetLatestMVCCTimestampIfLocal_Call{Call: _e.mock.On("GetLatestMVCCTimestampIfLocal", ctx, vchannel)}
}
func (_c *MockHandlerClient_GetLatestMVCCTimestampIfLocal_Call) Run(run func(ctx context.Context, vchannel string)) *MockHandlerClient_GetLatestMVCCTimestampIfLocal_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string))
})
return _c
}
func (_c *MockHandlerClient_GetLatestMVCCTimestampIfLocal_Call) Return(_a0 uint64, _a1 error) *MockHandlerClient_GetLatestMVCCTimestampIfLocal_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockHandlerClient_GetLatestMVCCTimestampIfLocal_Call) RunAndReturn(run func(context.Context, string) (uint64, error)) *MockHandlerClient_GetLatestMVCCTimestampIfLocal_Call {
_c.Call.Return(run)
return _c
}
// NewMockHandlerClient creates a new instance of MockHandlerClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockHandlerClient(t interface {

View File

@ -244,6 +244,63 @@ func (_c *MockWAL_Close_Call) RunAndReturn(run func()) *MockWAL_Close_Call {
return _c
}
// GetLatestMVCCTimestamp provides a mock function with given fields: ctx, vchannel
func (_m *MockWAL) GetLatestMVCCTimestamp(ctx context.Context, vchannel string) (uint64, error) {
ret := _m.Called(ctx, vchannel)
if len(ret) == 0 {
panic("no return value specified for GetLatestMVCCTimestamp")
}
var r0 uint64
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, string) (uint64, error)); ok {
return rf(ctx, vchannel)
}
if rf, ok := ret.Get(0).(func(context.Context, string) uint64); ok {
r0 = rf(ctx, vchannel)
} else {
r0 = ret.Get(0).(uint64)
}
if rf, ok := ret.Get(1).(func(context.Context, string) error); ok {
r1 = rf(ctx, vchannel)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockWAL_GetLatestMVCCTimestamp_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLatestMVCCTimestamp'
type MockWAL_GetLatestMVCCTimestamp_Call struct {
*mock.Call
}
// GetLatestMVCCTimestamp is a helper method to define mock.On call
// - ctx context.Context
// - vchannel string
func (_e *MockWAL_Expecter) GetLatestMVCCTimestamp(ctx interface{}, vchannel interface{}) *MockWAL_GetLatestMVCCTimestamp_Call {
return &MockWAL_GetLatestMVCCTimestamp_Call{Call: _e.mock.On("GetLatestMVCCTimestamp", ctx, vchannel)}
}
func (_c *MockWAL_GetLatestMVCCTimestamp_Call) Run(run func(ctx context.Context, vchannel string)) *MockWAL_GetLatestMVCCTimestamp_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string))
})
return _c
}
func (_c *MockWAL_GetLatestMVCCTimestamp_Call) Return(_a0 uint64, _a1 error) *MockWAL_GetLatestMVCCTimestamp_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockWAL_GetLatestMVCCTimestamp_Call) RunAndReturn(run func(context.Context, string) (uint64, error)) *MockWAL_GetLatestMVCCTimestamp_Call {
_c.Call.Return(run)
return _c
}
// IsAvailable provides a mock function with given fields:
func (_m *MockWAL) IsAvailable() bool {
ret := _m.Called()

View File

@ -7,6 +7,8 @@ import (
mock "github.com/stretchr/testify/mock"
mvcc "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/mvcc"
types "github.com/milvus-io/milvus/pkg/streaming/util/types"
wab "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/wab"
@ -70,6 +72,64 @@ func (_c *MockTimeTickSyncOperator_Channel_Call) RunAndReturn(run func() types.P
return _c
}
// MVCCManager provides a mock function with given fields: ctx
func (_m *MockTimeTickSyncOperator) MVCCManager(ctx context.Context) (*mvcc.MVCCManager, error) {
ret := _m.Called(ctx)
if len(ret) == 0 {
panic("no return value specified for MVCCManager")
}
var r0 *mvcc.MVCCManager
var r1 error
if rf, ok := ret.Get(0).(func(context.Context) (*mvcc.MVCCManager, error)); ok {
return rf(ctx)
}
if rf, ok := ret.Get(0).(func(context.Context) *mvcc.MVCCManager); ok {
r0 = rf(ctx)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*mvcc.MVCCManager)
}
}
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
r1 = rf(ctx)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockTimeTickSyncOperator_MVCCManager_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MVCCManager'
type MockTimeTickSyncOperator_MVCCManager_Call struct {
*mock.Call
}
// MVCCManager is a helper method to define mock.On call
// - ctx context.Context
func (_e *MockTimeTickSyncOperator_Expecter) MVCCManager(ctx interface{}) *MockTimeTickSyncOperator_MVCCManager_Call {
return &MockTimeTickSyncOperator_MVCCManager_Call{Call: _e.mock.On("MVCCManager", ctx)}
}
func (_c *MockTimeTickSyncOperator_MVCCManager_Call) Run(run func(ctx context.Context)) *MockTimeTickSyncOperator_MVCCManager_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context))
})
return _c
}
func (_c *MockTimeTickSyncOperator_MVCCManager_Call) Return(_a0 *mvcc.MVCCManager, _a1 error) *MockTimeTickSyncOperator_MVCCManager_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockTimeTickSyncOperator_MVCCManager_Call) RunAndReturn(run func(context.Context) (*mvcc.MVCCManager, error)) *MockTimeTickSyncOperator_MVCCManager_Call {
_c.Call.Return(run)
return _c
}
// Sync provides a mock function with given fields: ctx
func (_m *MockTimeTickSyncOperator) Sync(ctx context.Context) {
_m.Called(ctx)

View File

@ -2747,3 +2747,17 @@ func (t *ListResourceGroupsTask) Execute(ctx context.Context) error {
func (t *ListResourceGroupsTask) PostExecute(ctx context.Context) error {
return nil
}
// isIgnoreGrowing is used to check if the request should ignore growing
func isIgnoreGrowing(params []*commonpb.KeyValuePair) (bool, error) {
for _, kv := range params {
if kv.GetKey() == IgnoreGrowingKey {
ignoreGrowing, err := strconv.ParseBool(kv.GetValue())
if err != nil {
return false, errors.New("parse ignore growing field failed")
}
return ignoreGrowing, nil
}
}
return false, nil
}

View File

@ -359,18 +359,9 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
log.Debug("Validate partition names.")
// fetch search_growing from query param
var ignoreGrowing bool
for i, kv := range t.request.GetQueryParams() {
if kv.GetKey() == IgnoreGrowingKey {
ignoreGrowing, err = strconv.ParseBool(kv.Value)
if err != nil {
return errors.New("parse search growing failed")
}
t.request.QueryParams = append(t.request.GetQueryParams()[:i], t.request.GetQueryParams()[i+1:]...)
break
}
if t.RetrieveRequest.IgnoreGrowing, err = isIgnoreGrowing(t.request.GetQueryParams()); err != nil {
return err
}
t.RetrieveRequest.IgnoreGrowing = ignoreGrowing
queryParams, err := parseQueryParams(t.request.GetQueryParams())
if err != nil {
@ -463,6 +454,7 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
guaranteeTs := t.request.GetGuaranteeTimestamp()
var consistencyLevel commonpb.ConsistencyLevel
useDefaultConsistency := t.request.GetUseDefaultConsistency()
t.RetrieveRequest.ConsistencyLevel = t.request.GetConsistencyLevel()
if useDefaultConsistency {
consistencyLevel = collectionInfo.consistencyLevel
guaranteeTs = parseGuaranteeTsFromConsistency(guaranteeTs, t.BeginTs(), consistencyLevel)
@ -482,6 +474,7 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
t.MvccTimestamp = t.request.GetGuaranteeTimestamp()
t.GuaranteeTimestamp = t.request.GetGuaranteeTimestamp()
}
t.RetrieveRequest.IsIterator = queryParams.isIterator
deadline, ok := t.TraceCtx().Deadline()
if ok {

View File

@ -193,19 +193,9 @@ func (t *searchTask) PreExecute(ctx context.Context) error {
}
t.SearchRequest.Nq = nq
var ignoreGrowing bool
// parse common search params
for i, kv := range t.request.GetSearchParams() {
if kv.GetKey() == IgnoreGrowingKey {
ignoreGrowing, err = strconv.ParseBool(kv.GetValue())
if err != nil {
return errors.New("parse search growing failed")
}
t.request.SearchParams = append(t.request.GetSearchParams()[:i], t.request.GetSearchParams()[i+1:]...)
break
}
if t.SearchRequest.IgnoreGrowing, err = isIgnoreGrowing(t.request.SearchParams); err != nil {
return err
}
t.SearchRequest.IgnoreGrowing = ignoreGrowing
outputFieldIDs, err := getOutputFieldIDs(t.schema, t.request.GetOutputFields())
if err != nil {
@ -260,6 +250,7 @@ func (t *searchTask) PreExecute(ctx context.Context) error {
t.MvccTimestamp = t.request.GetGuaranteeTimestamp()
t.GuaranteeTimestamp = t.request.GetGuaranteeTimestamp()
}
t.SearchRequest.IsIterator = t.isIterator
if deadline, ok := t.TraceCtx().Deadline(); ok {
t.SearchRequest.TimeoutTimestamp = tsoutil.ComposeTSByTime(deadline, 0)
@ -370,6 +361,14 @@ func (t *searchTask) initAdvancedSearchRequest(ctx context.Context) error {
return err
}
ignoreGrowing := t.SearchRequest.IgnoreGrowing
if !ignoreGrowing {
// fetch ignore_growing from sub search param if not set in search request
if ignoreGrowing, err = isIgnoreGrowing(subReq.GetSearchParams()); err != nil {
return err
}
}
internalSubReq := &internalpb.SubSearchRequest{
Dsl: subReq.GetDsl(),
PlaceholderGroup: subReq.GetPlaceholderGroup(),
@ -382,6 +381,7 @@ func (t *searchTask) initAdvancedSearchRequest(ctx context.Context) error {
MetricType: queryInfo.GetMetricType(),
GroupByFieldId: t.rankParams.GetGroupByFieldId(),
GroupSize: t.rankParams.GetGroupSize(),
IgnoreGrowing: ignoreGrowing,
}
internalSubReq.FieldId = queryInfo.GetQueryFieldId()

View File

@ -35,6 +35,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/distributed/streaming"
"github.com/milvus-io/milvus/internal/querynodev2/cluster"
"github.com/milvus-io/milvus/internal/querynodev2/delegator/deletebuffer"
"github.com/milvus-io/milvus/internal/querynodev2/pkoracle"
@ -43,6 +44,7 @@ import (
"github.com/milvus-io/milvus/internal/util/function"
"github.com/milvus-io/milvus/internal/util/reduce"
"github.com/milvus-io/milvus/internal/util/searchutil/optimizers"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/internal/util/streamrpc"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
@ -349,6 +351,14 @@ func (sd *shardDelegator) Search(ctx context.Context, req *querypb.SearchRequest
return nil, fmt.Errorf("dml channel not match, delegator channel %s, search channels %v", sd.vchannelName, req.GetDmlChannels())
}
req.Req.GuaranteeTimestamp = sd.speedupGuranteeTS(
ctx,
req.Req.GetConsistencyLevel(),
req.Req.GetGuaranteeTimestamp(),
req.Req.GetMvccTimestamp(),
req.Req.GetIsIterator(),
)
// wait tsafe
waitTr := timerecord.NewTimeRecorder("wait tSafe")
tSafe, err := sd.waitTSafe(ctx, req.Req.GuaranteeTimestamp)
@ -390,13 +400,14 @@ func (sd *shardDelegator) Search(ctx context.Context, req *querypb.SearchRequest
Nq: subReq.GetNq(),
Topk: subReq.GetTopk(),
MetricType: subReq.GetMetricType(),
IgnoreGrowing: req.GetReq().GetIgnoreGrowing(),
IgnoreGrowing: subReq.GetIgnoreGrowing(),
Username: req.GetReq().GetUsername(),
IsAdvanced: false,
GroupByFieldId: subReq.GetGroupByFieldId(),
GroupSize: subReq.GetGroupSize(),
FieldId: subReq.GetFieldId(),
IsTopkReduce: req.GetReq().GetIsTopkReduce(),
IsIterator: req.GetReq().GetIsIterator(),
}
future := conc.Go(func() (*internalpb.SearchResults, error) {
searchReq := &querypb.SearchRequest{
@ -457,6 +468,14 @@ func (sd *shardDelegator) QueryStream(ctx context.Context, req *querypb.QueryReq
return fmt.Errorf("dml channel not match, delegator channel %s, search channels %v", sd.vchannelName, req.GetDmlChannels())
}
req.Req.GuaranteeTimestamp = sd.speedupGuranteeTS(
ctx,
req.Req.GetConsistencyLevel(),
req.Req.GetGuaranteeTimestamp(),
req.Req.GetMvccTimestamp(),
req.Req.GetIsIterator(),
)
// wait tsafe
waitTr := timerecord.NewTimeRecorder("wait tSafe")
tSafe, err := sd.waitTSafe(ctx, req.Req.GetGuaranteeTimestamp())
@ -520,6 +539,14 @@ func (sd *shardDelegator) Query(ctx context.Context, req *querypb.QueryRequest)
return nil, fmt.Errorf("dml channel not match, delegator channel %s, search channels %v", sd.vchannelName, req.GetDmlChannels())
}
req.Req.GuaranteeTimestamp = sd.speedupGuranteeTS(
ctx,
req.Req.GetConsistencyLevel(),
req.Req.GetGuaranteeTimestamp(),
req.Req.GetMvccTimestamp(),
req.Req.GetIsIterator(),
)
// wait tsafe
waitTr := timerecord.NewTimeRecorder("wait tSafe")
tSafe, err := sd.waitTSafe(ctx, req.Req.GetGuaranteeTimestamp())
@ -738,6 +765,28 @@ func executeSubTasks[T any, R interface {
return results, nil
}
// speedupGuranteeTS returns the guarantee timestamp for strong consistency search.
// TODO: we just make a speedup right now, but in the future, we will make the mvcc and guarantee timestamp same.
func (sd *shardDelegator) speedupGuranteeTS(
ctx context.Context,
cl commonpb.ConsistencyLevel,
guaranteeTS uint64,
mvccTS uint64,
isIterator bool,
) uint64 {
// when 1. streaming service is disable,
// 2. consistency level is not strong,
// 3. cannot speed iterator, because current client of milvus doesn't support shard level mvcc.
if !streamingutil.IsStreamingServiceEnabled() || isIterator || cl != commonpb.ConsistencyLevel_Strong || mvccTS != 0 {
return guaranteeTS
}
// use the mvcc timestamp of the wal as the guarantee timestamp to make fast strong consistency search.
if mvcc, err := streaming.WAL().GetLatestMVCCTimestampIfLocal(ctx, sd.vchannelName); err == nil && mvcc < guaranteeTS {
return mvcc
}
return guaranteeTS
}
// waitTSafe returns when tsafe listener notifies a timestamp which meet the guarantee ts.
func (sd *shardDelegator) waitTSafe(ctx context.Context, ts uint64) (uint64, error) {
ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, "Delegator-waitTSafe")

View File

@ -28,8 +28,9 @@ import (
)
var (
_ HandlerClient = (*handlerClientImpl)(nil)
ErrClientClosed = errors.New("handler client is closed")
_ HandlerClient = (*handlerClientImpl)(nil)
ErrClientClosed = errors.New("handler client is closed")
ErrClientAssignmentNotReady = errors.New("handler client assignment not ready")
)
type (
@ -65,6 +66,10 @@ type ConsumerOptions struct {
// HandlerClient wraps the PChannel Assignment Service Discovery.
// Provides the ability to create pchannel-level producer and consumer.
type HandlerClient interface {
// GetLatestMVCCTimestampIfLocal gets the latest mvcc timestamp of the vchannel.
// If the wal is located at remote, it will return 0, error.
GetLatestMVCCTimestampIfLocal(ctx context.Context, vchannel string) (uint64, error)
// CreateProducer creates a producer.
// Producer is a stream client without keep alive promise.
// It will be available until context canceled, active close, streaming error or remote server wal closing.

View File

@ -20,6 +20,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -39,6 +40,28 @@ type handlerClientImpl struct {
newConsumer func(ctx context.Context, opts *consumer.ConsumerOptions, handlerClient streamingpb.StreamingNodeHandlerServiceClient) (Consumer, error)
}
// GetLatestMVCCTimestampIfLocal gets the latest mvcc timestamp of the vchannel.
func (hc *handlerClientImpl) GetLatestMVCCTimestampIfLocal(ctx context.Context, vchannel string) (uint64, error) {
if !hc.lifetime.Add(typeutil.LifetimeStateWorking) {
return 0, ErrClientClosed
}
defer hc.lifetime.Done()
pchannel := funcutil.ToPhysicalChannel(vchannel)
// Get current assignment of pchannel.
assign := hc.watcher.Get(ctx, pchannel)
if assign == nil {
return 0, ErrClientAssignmentNotReady
}
// Get the wal at local registry.
w, err := registry.GetAvailableWAL(assign.Channel)
if err != nil {
return 0, err
}
return w.GetLatestMVCCTimestamp(ctx, vchannel)
}
// CreateProducer creates a producer.
func (hc *handlerClientImpl) CreateProducer(ctx context.Context, opts *ProducerOptions) (Producer, error) {
if !hc.lifetime.Add(typeutil.LifetimeStateWorking) {

View File

@ -91,6 +91,7 @@ func TestHandlerClient(t *testing.T) {
producer2.Close()
producer3.Close()
handler.GetLatestMVCCTimestampIfLocal(ctx, "pchannel")
producer4, err := handler.CreateProducer(ctx, &ProducerOptions{PChannel: "pchannel"})
assert.NoError(t, err)
assert.NotNil(t, producer4)
@ -121,6 +122,8 @@ func TestHandlerClient(t *testing.T) {
assert.Error(t, err)
assert.ErrorIs(t, err, ErrClientClosed)
assert.Nil(t, consumer)
handler.GetLatestMVCCTimestampIfLocal(ctx, "pchannel")
}
func TestDial(t *testing.T) {

View File

@ -85,6 +85,26 @@ func (w *walAdaptorImpl) Channel() types.PChannelInfo {
return w.inner.Channel()
}
// GetLatestMVCCTimestamp get the latest mvcc timestamp of the wal at vchannel.
func (w *walAdaptorImpl) GetLatestMVCCTimestamp(ctx context.Context, vchannel string) (uint64, error) {
if !w.lifetime.Add(typeutil.LifetimeStateWorking) {
return 0, status.NewOnShutdownError("wal is on shutdown")
}
defer w.lifetime.Done()
operator := resource.Resource().TimeTickInspector().MustGetOperator(w.inner.Channel())
mvccManager, err := operator.MVCCManager(ctx)
if err != nil {
// Unreachable code forever.
return 0, err
}
currentMVCC := mvccManager.GetMVCCOfVChannel(vchannel)
if !currentMVCC.Confirmed {
// if the mvcc is not confirmed, trigger a sync operation to make it confirmed as soon as possible.
resource.Resource().TimeTickInspector().TriggerSync(w.inner.Channel())
}
return currentMVCC.Timetick, nil
}
// Append writes a record to the log.
func (w *walAdaptorImpl) Append(ctx context.Context, msg message.MutableMessage) (*wal.AppendResult, error) {
if !w.lifetime.Add(typeutil.LifetimeStateWorking) {

View File

@ -159,8 +159,12 @@ func (f *testOneWALFramework) testReadAndWrite(ctx context.Context, w wal.WAL) {
var newWritten []message.ImmutableMessage
var read1, read2 []message.ImmutableMessage
appendDone := make(chan struct{})
go func() {
defer wg.Done()
defer func() {
close(appendDone)
wg.Done()
}()
var err error
newWritten, err = f.testAppend(ctx, w)
assert.NoError(f.t, err)
@ -178,6 +182,7 @@ func (f *testOneWALFramework) testReadAndWrite(ctx context.Context, w wal.WAL) {
assert.NoError(f.t, err)
}()
wg.Wait()
// read result should be sorted by timetick.
f.assertSortByTimeTickMessageList(read1)
f.assertSortByTimeTickMessageList(read2)

View File

@ -3,6 +3,7 @@ package inspector
import (
"context"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/mvcc"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/wab"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
)
@ -11,6 +12,9 @@ type TimeTickSyncOperator interface {
// Channel returns the pchannel info.
Channel() types.PChannelInfo
// MVCCManager returns the related mvcc timestamp manager of current wal.
MVCCManager(ctx context.Context) (*mvcc.MVCCManager, error)
// WriteAheadBuffer get the related WriteAhead buffer.
WriteAheadBuffer(ctx context.Context) (wab.ROWriteAheadBuffer, error)

View File

@ -0,0 +1,94 @@
package mvcc
import (
"sync"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
)
// NewMVCCManager creates a new mvcc timestamp manager.
func NewMVCCManager(lastConfirmedTimeTick uint64) *MVCCManager {
return &MVCCManager{
pchannelMVCCTimestamp: lastConfirmedTimeTick,
vchannelMVCCTimestamps: make(map[string]uint64),
}
}
// MVCCManager is the manager that manages all the mvcc state of one wal.
// It keeps the last confirmed timestamp as mvcc of one pchannel and maximum timetick persisted into the wal of each vchannel.
type MVCCManager struct {
mu sync.Mutex
pchannelMVCCTimestamp uint64 // the last confirmed timetick of the pchannel.
vchannelMVCCTimestamps map[string]uint64 // map the vchannel to the maximum timetick that is persisted into the wal.
}
// GetMVCCOfVChannel gets the mvcc of the vchannel.
func (cm *MVCCManager) GetMVCCOfVChannel(vchannel string) VChannelMVCC {
cm.mu.Lock()
defer cm.mu.Unlock()
if tt, ok := cm.vchannelMVCCTimestamps[vchannel]; ok {
return VChannelMVCC{
Timetick: tt,
Confirmed: false,
}
}
// if the mvcc of vchannel is not found which means that
// the mvcc of vchannel is equal to the pchannel level lastConfirmedTimeTick.
// and that mvcc timestamp is already confirmed.
return VChannelMVCC{
Timetick: cm.pchannelMVCCTimestamp,
Confirmed: true,
}
}
// UpdateMVCC updates the mvcc state by incoming message.
func (cm *MVCCManager) UpdateMVCC(msg message.MutableMessage) {
tt := msg.TimeTick()
msgType := msg.MessageType()
vchannel := msg.VChannel()
isTxn := msg.TxnContext() != nil
cm.mu.Lock()
defer cm.mu.Unlock()
if tt <= cm.pchannelMVCCTimestamp {
return
}
if msgType == message.MessageTypeTimeTick {
cm.sync(tt)
return
}
// If the message is belong to a transaction.
// the mvcc timestamp cannot be push forward if the message is not committed.
// because of an unconfirmed transaction may be rollback and cannot be seen at read side.
if isTxn && msgType != message.MessageTypeCommitTxn {
return
}
if exists, ok := cm.vchannelMVCCTimestamps[vchannel]; !ok || exists < tt {
cm.vchannelMVCCTimestamps[vchannel] = tt
}
}
// sync syncs the mvcc state by the incoming timetick message, push forward the wal mvcc state
// and clear the vchannel mvcc state.
func (cm *MVCCManager) sync(tt uint64) {
// clear all existing vchannel mvcc that are are confirmed.
// which means the mvcc of vchannel is equal to the pchannel level mvcc.
for vchannel, existTimeTick := range cm.vchannelMVCCTimestamps {
if existTimeTick <= tt {
delete(cm.vchannelMVCCTimestamps, vchannel)
}
}
cm.pchannelMVCCTimestamp = tt
}
// VChannelMVCC is a mvcc of one vchannel
// which is used to identify the maximum timetick that is persisted into the wal of one vchannel.
// The state of mvcc that is confirmed if the timetick is synced by timeticksync message,
// otherwise, the mvcc is not confirmed.
type VChannelMVCC struct {
Timetick uint64 // the timetick of the mvcc.
Confirmed bool // the mvcc is confirmed by the timeticksync operation.
}

View File

@ -0,0 +1,71 @@
package mvcc
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
)
func TestNewMVCCManager(t *testing.T) {
cm := NewMVCCManager(100)
v := cm.GetMVCCOfVChannel("vc1")
assert.Equal(t, v, VChannelMVCC{Timetick: 100, Confirmed: true})
cm.UpdateMVCC(createTestMessage(t, 101, "vc1", message.MessageTypeInsert, false))
v = cm.GetMVCCOfVChannel("vc1")
assert.Equal(t, v, VChannelMVCC{Timetick: 101, Confirmed: false})
v = cm.GetMVCCOfVChannel("vc2")
assert.Equal(t, v, VChannelMVCC{Timetick: 100, Confirmed: true})
cm.UpdateMVCC(createTestMessage(t, 102, "", message.MessageTypeTimeTick, false))
v = cm.GetMVCCOfVChannel("vc1")
assert.Equal(t, v, VChannelMVCC{Timetick: 102, Confirmed: true})
v = cm.GetMVCCOfVChannel("vc2")
assert.Equal(t, v, VChannelMVCC{Timetick: 102, Confirmed: true})
cm.UpdateMVCC(createTestMessage(t, 103, "vc1", message.MessageTypeInsert, true))
v = cm.GetMVCCOfVChannel("vc1")
assert.Equal(t, v, VChannelMVCC{Timetick: 102, Confirmed: true})
v = cm.GetMVCCOfVChannel("vc2")
assert.Equal(t, v, VChannelMVCC{Timetick: 102, Confirmed: true})
cm.UpdateMVCC(createTestMessage(t, 104, "vc1", message.MessageTypeCommitTxn, true))
v = cm.GetMVCCOfVChannel("vc1")
assert.Equal(t, v, VChannelMVCC{Timetick: 104, Confirmed: false})
v = cm.GetMVCCOfVChannel("vc2")
assert.Equal(t, v, VChannelMVCC{Timetick: 102, Confirmed: true})
cm.UpdateMVCC(createTestMessage(t, 104, "", message.MessageTypeTimeTick, false))
v = cm.GetMVCCOfVChannel("vc1")
assert.Equal(t, v, VChannelMVCC{Timetick: 104, Confirmed: true})
v = cm.GetMVCCOfVChannel("vc2")
assert.Equal(t, v, VChannelMVCC{Timetick: 104, Confirmed: true})
cm.UpdateMVCC(createTestMessage(t, 101, "", message.MessageTypeTimeTick, false))
v = cm.GetMVCCOfVChannel("vc1")
assert.Equal(t, v, VChannelMVCC{Timetick: 104, Confirmed: true})
v = cm.GetMVCCOfVChannel("vc2")
assert.Equal(t, v, VChannelMVCC{Timetick: 104, Confirmed: true})
}
func createTestMessage(
t *testing.T,
tt uint64,
vchannel string,
msgType message.MessageType,
txTxn bool,
) message.MutableMessage {
msg := mock_message.NewMockMutableMessage(t)
msg.EXPECT().TimeTick().Return(tt)
msg.EXPECT().VChannel().Return(vchannel)
msg.EXPECT().MessageType().Return(msgType)
if txTxn {
msg.EXPECT().TxnContext().Return(&message.TxnContext{})
return msg
}
msg.EXPECT().TxnContext().Return(nil)
return msg
}

View File

@ -34,16 +34,26 @@ func (impl *timeTickAppendInterceptor) Ready() <-chan struct{} {
// Do implements AppendInterceptor.
func (impl *timeTickAppendInterceptor) DoAppend(ctx context.Context, msg message.MutableMessage, append interceptors.Append) (msgID message.MessageID, err error) {
// the cursor manager should beready since the timetick interceptor is ready.
cm, _ := impl.operator.MVCCManager(ctx)
defer func() {
if err == nil {
cm.UpdateMVCC(msg)
}
}()
ackManager := impl.operator.AckManager()
var txnSession *txn.TxnSession
if msg.MessageType() != message.MessageTypeTimeTick {
// Allocate new timestamp acker for message.
var acker *ack.Acker
if msg.BarrierTimeTick() == 0 {
if acker, err = impl.operator.AckManager().Allocate(ctx); err != nil {
if acker, err = ackManager.Allocate(ctx); err != nil {
return nil, errors.Wrap(err, "allocate timestamp failed")
}
} else {
if acker, err = impl.operator.AckManager().AllocateWithBarrier(ctx, msg.BarrierTimeTick()); err != nil {
if acker, err = ackManager.AllocateWithBarrier(ctx, msg.BarrierTimeTick()); err != nil {
return nil, errors.Wrap(err, "allocate timestamp with barrier failed")
}
}

View File

@ -11,6 +11,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/ack"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/mvcc"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/wab"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/utility"
@ -57,6 +58,7 @@ type timeTickSyncOperator struct {
ackDetails *ack.AckDetails // all acknowledged details, all acked messages but not sent to wal will be kept here.
sourceID int64 // the current node id.
writeAheadBuffer *wab.WriteAheadBuffer // write ahead buffer.
mvccManager *mvcc.MVCCManager // cursor manager is used to record the maximum presisted timetick of vchannel.
metrics *metricsutil.TimeTickMetrics
}
@ -68,11 +70,24 @@ func (impl *timeTickSyncOperator) WriteAheadBuffer(ctx context.Context) (wab.ROW
case <-impl.ready:
}
if impl.writeAheadBuffer == nil {
panic("unreachable write ahead buffer is not ready")
panic("unreachable: write ahead buffer is not ready")
}
return impl.writeAheadBuffer, nil
}
// MVCCManager returns the mvcc manager.
func (impl *timeTickSyncOperator) MVCCManager(ctx context.Context) (*mvcc.MVCCManager, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-impl.ready:
}
if impl.mvccManager == nil {
panic("unreachable: mvcc manager is not ready")
}
return impl.mvccManager, nil
}
// Channel returns the pchannel info.
func (impl *timeTickSyncOperator) Channel() types.PChannelInfo {
return impl.pchannel
@ -175,6 +190,7 @@ func (impl *timeTickSyncOperator) blockUntilSyncTimeTickReady() error {
keepalive,
msg.IntoImmutableMessage(msgID),
)
impl.mvccManager = mvcc.NewMVCCManager(ts)
break
}
// interceptor is ready now.

View File

@ -14,6 +14,9 @@ type AppendResult = types.AppendResult
type WAL interface {
WALName() string
// GetLatestMVCCTimestamp get the latest mvcc timestamp of the wal at vchannel.
GetLatestMVCCTimestamp(ctx context.Context, vchannel string) (uint64, error)
// Channel returns the channel assignment info of the wal.
Channel() types.PChannelInfo

View File

@ -98,6 +98,7 @@ message SubSearchRequest {
int64 group_by_field_id = 10;
int64 group_size = 11;
int64 field_id = 12;
bool ignore_growing = 13;
}
message SearchRequest {
@ -129,6 +130,7 @@ message SearchRequest {
int64 field_id = 25;
bool is_topk_reduce = 26;
bool is_recall_evaluation = 27;
bool is_iterator = 28;
}
message SubSearchResults {
@ -193,6 +195,8 @@ message RetrieveRequest {
string username = 15;
bool reduce_stop_for_best = 16; //deprecated
int32 reduce_type = 17;
common.ConsistencyLevel consistency_level = 18;
bool is_iterator = 19;
}

File diff suppressed because it is too large Load Diff