fix: local wal perform different with remote wal (#39967)

issue: #38399

Signed-off-by: chyezh <chyezh@outlook.com>
pull/40024/head
Zhen Ye 2025-02-19 19:12:51 +08:00 committed by GitHub
parent f47320e0e7
commit fd701eca71
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 137 additions and 6 deletions

View File

@ -55,7 +55,7 @@ func (hc *handlerClientImpl) GetLatestMVCCTimestampIfLocal(ctx context.Context,
}
// Get the wal at local registry.
w, err := registry.GetAvailableWAL(assign.Channel)
w, err := registry.GetLocalAvailableWAL(assign.Channel)
if err != nil {
return 0, err
}
@ -71,7 +71,7 @@ func (hc *handlerClientImpl) CreateProducer(ctx context.Context, opts *ProducerO
p, err := hc.createHandlerAfterStreamingNodeReady(ctx, opts.PChannel, func(ctx context.Context, assign *types.PChannelInfoAssigned) (*handlerCreateResult, error) {
// Check if the localWAL is assigned at local
localWAL, err := registry.GetAvailableWAL(assign.Channel)
localWAL, err := registry.GetLocalAvailableWAL(assign.Channel)
if err == nil {
return localResult(localWAL), nil
}
@ -106,7 +106,7 @@ func (hc *handlerClientImpl) CreateConsumer(ctx context.Context, opts *ConsumerO
c, err := hc.createHandlerAfterStreamingNodeReady(ctx, opts.PChannel, func(ctx context.Context, assign *types.PChannelInfoAssigned) (*handlerCreateResult, error) {
// Check if the localWAL is assigned at local
localWAL, err := registry.GetAvailableWAL(assign.Channel)
localWAL, err := registry.GetLocalAvailableWAL(assign.Channel)
if err == nil {
localScanner, err := localWAL.Read(ctx, wal.ReadOption{
VChannel: opts.VChannel,

View File

@ -0,0 +1,20 @@
package registry
var (
_ isLocal = localWAL{}
_ isLocal = localScanner{}
)
// localTrait is used to make isLocal can only be implemented by current package.
type localTrait struct{}
// isLocal is a hint interface for local wal.
type isLocal interface {
isLocal() localTrait
}
// IsLocal checks if the component is local.
func IsLocal(component any) bool {
_, ok := component.(isLocal)
return ok
}

View File

@ -7,6 +7,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/syncutil"
@ -28,12 +29,17 @@ func RegisterLocalWALManager(manager WALManager) {
log.Ctx(context.Background()).Info("register local wal manager done")
}
// GetAvailableWAL returns a available wal instance for the channel.
func GetAvailableWAL(channel types.PChannelInfo) (wal.WAL, error) {
// GetLocalAvailableWAL returns a available wal instance for the channel.
func GetLocalAvailableWAL(channel types.PChannelInfo) (wal.WAL, error) {
if !paramtable.IsLocalComponentEnabled(typeutil.StreamingNodeRole) {
return nil, ErrNoStreamingNodeDeployed
}
return registry.Get().GetAvailableWAL(channel)
l, err := registry.Get().GetAvailableWAL(channel)
if err != nil {
return nil, err
}
return localWAL{l}, nil // because the appended message object may be reused, make some difference between remote wal and local wal.
// so make a copy before appending for local wal to keep the consistency.
}
// WALManager is a hint type for wal manager at streaming node.
@ -42,3 +48,41 @@ type WALManager interface {
// Return nil if the wal instance is not found.
GetAvailableWAL(channel types.PChannelInfo) (wal.WAL, error)
}
// localWAL is a hint type for local wal.
type localWAL struct {
wal.WAL
}
func (l localWAL) isLocal() localTrait {
return localTrait{}
}
// Append writes a record to the log.
func (l localWAL) Append(ctx context.Context, msg message.MutableMessage) (*types.AppendResult, error) {
msg = message.CloneMutableMessage(msg)
return l.WAL.Append(ctx, msg)
}
// Append a record to the log asynchronously.
func (l localWAL) AppendAsync(ctx context.Context, msg message.MutableMessage, cb func(*types.AppendResult, error)) {
msg = message.CloneMutableMessage(msg)
l.WAL.AppendAsync(ctx, msg, cb)
}
func (l localWAL) Read(ctx context.Context, deliverPolicy wal.ReadOption) (wal.Scanner, error) {
s, err := l.WAL.Read(ctx, deliverPolicy)
if err != nil {
return nil, err
}
return localScanner{s}, nil
}
// localScanner is a hint type for local wal scanner.
type localScanner struct {
wal.Scanner
}
func (s localScanner) isLocal() localTrait {
return localTrait{}
}

View File

@ -0,0 +1,55 @@
package registry
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/mocks/streamingnode/server/mock_wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type mockWALManager struct {
t *testing.T
}
func (m *mockWALManager) GetAvailableWAL(channel types.PChannelInfo) (wal.WAL, error) {
l := mock_wal.NewMockWAL(m.t)
l.EXPECT().Append(mock.Anything, mock.Anything).Return(&types.AppendResult{}, nil)
l.EXPECT().AppendAsync(mock.Anything, mock.Anything, mock.Anything).Return()
l.EXPECT().Read(mock.Anything, mock.Anything).Return(mock_wal.NewMockScanner(m.t), nil)
return l, nil
}
func TestGetLocalAvailableWAL(t *testing.T) {
paramtable.Init()
paramtable.SetLocalComponentEnabled(typeutil.StreamingNodeRole)
manager := &mockWALManager{t: t}
RegisterLocalWALManager(manager)
walInstance, err := GetLocalAvailableWAL(types.PChannelInfo{})
assert.NoError(t, err)
assert.NotNil(t, walInstance)
assert.True(t, IsLocal(walInstance))
msg, _ := message.NewTimeTickMessageBuilderV1().
WithAllVChannel().
WithHeader(&message.TimeTickMessageHeader{}).
WithBody(&msgpb.TimeTickMsg{}).
BuildMutable()
walInstance.Append(context.Background(), msg)
walInstance.AppendAsync(context.Background(), msg, func(ar *wal.AppendResult, err error) {})
s, err := walInstance.Read(context.Background(), wal.ReadOption{})
assert.NoError(t, err)
assert.NotNil(t, walInstance)
assert.True(t, IsLocal(s))
}

View File

@ -234,6 +234,18 @@ func (m *messageImpl) SplitIntoMutableMessage() []MutableMessage {
return msgs
}
// CloneMutableMessage clones the current mutable message.
func CloneMutableMessage(msg MutableMessage) MutableMessage {
if msg == nil {
return nil
}
inner := msg.(*messageImpl)
return &messageImpl{
payload: inner.payload,
properties: inner.properties.Clone(),
}
}
type immutableMessageImpl struct {
messageImpl
id MessageID