enhance: wal interface definition (#33745)

issue: #33285

---------

Signed-off-by: chyezh <chyezh@outlook.com>
pull/34090/head
chyezh 2024-06-24 10:34:12 +08:00 committed by GitHub
parent b961767005
commit b9237280c2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
61 changed files with 3244 additions and 48 deletions

View File

@ -517,8 +517,8 @@ generate-mockery-chunk-manager: getdeps
generate-mockery-pkg:
$(MAKE) -C pkg generate-mockery
generate-mockery-log:
$(INSTALL_PATH)/mockery --config $(PWD)/internal/logservice/.mockery.yaml
generate-mockery-streaming:
$(INSTALL_PATH)/mockery --config $(PWD)/internal/streamingservice/.mockery.yaml
generate-mockery: generate-mockery-types generate-mockery-kv generate-mockery-rootcoord generate-mockery-proxy generate-mockery-querycoord generate-mockery-querynode generate-mockery-datacoord generate-mockery-pkg generate-mockery-log

3
go.mod
View File

@ -69,7 +69,9 @@ require (
github.com/jolestar/go-commons-pool/v2 v2.1.2
github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000
github.com/pkg/errors v0.9.1
github.com/remeh/sizedwaitgroup v1.0.0
github.com/zeebo/xxh3 v1.0.2
google.golang.org/protobuf v1.33.0
gopkg.in/yaml.v3 v3.0.1
)
@ -238,7 +240,6 @@ require (
google.golang.org/genproto v0.0.0-20230706204954-ccb25ca9f130 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230629202037-9506855d4529 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect

2
go.sum
View File

@ -762,6 +762,8 @@ github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/quasilyte/go-ruleguard/dsl v0.3.22 h1:wd8zkOhSNr+I+8Qeciml08ivDt1pSXe60+5DqOpCjPE=
github.com/quasilyte/go-ruleguard/dsl v0.3.22/go.mod h1:KeCP03KrjuSO0H1kTuZQCWlQPulDV6YMIXmpQss17rU=
github.com/remeh/sizedwaitgroup v1.0.0 h1:VNGGFwNo/R5+MJBf6yrsr110p0m4/OX4S3DCy7Kyl5E=
github.com/remeh/sizedwaitgroup v1.0.0/go.mod h1:3j2R4OIe/SeS6YDhICBy22RWjJC5eNCJ1V+9+NVNYlo=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/rogpeppe/clock v0.0.0-20190514195947-2896927a307a/go.mod h1:4r5QyqhjIWCcK8DO4KMclc5Iknq5qVBAlbYYzAbUScQ=

View File

@ -1,13 +0,0 @@
quiet: False
with-expecter: True
filename: "mock_{{.InterfaceName}}.go"
dir: "internal/mocks/{{trimPrefix .PackagePath \"github.com/milvus-io/milvus/internal\" | dir }}/mock_{{.PackageName}}"
mockname: "Mock{{.InterfaceName}}"
outpkg: "mock_{{.PackageName}}"
packages:
github.com/milvus-io/milvus/internal/util/logserviceutil/message:
interfaces:
MessageID:
ImmutableMessage:
MutableMessage:
RProperties:

View File

@ -0,0 +1,124 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
package mock_wal
import (
context "context"
wal "github.com/milvus-io/milvus/internal/streamingnode/server/wal"
mock "github.com/stretchr/testify/mock"
)
// MockOpener is an autogenerated mock type for the Opener type
type MockOpener struct {
mock.Mock
}
type MockOpener_Expecter struct {
mock *mock.Mock
}
func (_m *MockOpener) EXPECT() *MockOpener_Expecter {
return &MockOpener_Expecter{mock: &_m.Mock}
}
// Close provides a mock function with given fields:
func (_m *MockOpener) Close() {
_m.Called()
}
// MockOpener_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'
type MockOpener_Close_Call struct {
*mock.Call
}
// Close is a helper method to define mock.On call
func (_e *MockOpener_Expecter) Close() *MockOpener_Close_Call {
return &MockOpener_Close_Call{Call: _e.mock.On("Close")}
}
func (_c *MockOpener_Close_Call) Run(run func()) *MockOpener_Close_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockOpener_Close_Call) Return() *MockOpener_Close_Call {
_c.Call.Return()
return _c
}
func (_c *MockOpener_Close_Call) RunAndReturn(run func()) *MockOpener_Close_Call {
_c.Call.Return(run)
return _c
}
// Open provides a mock function with given fields: ctx, opt
func (_m *MockOpener) Open(ctx context.Context, opt *wal.OpenOption) (wal.WAL, error) {
ret := _m.Called(ctx, opt)
var r0 wal.WAL
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *wal.OpenOption) (wal.WAL, error)); ok {
return rf(ctx, opt)
}
if rf, ok := ret.Get(0).(func(context.Context, *wal.OpenOption) wal.WAL); ok {
r0 = rf(ctx, opt)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(wal.WAL)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *wal.OpenOption) error); ok {
r1 = rf(ctx, opt)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockOpener_Open_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Open'
type MockOpener_Open_Call struct {
*mock.Call
}
// Open is a helper method to define mock.On call
// - ctx context.Context
// - opt *wal.OpenOption
func (_e *MockOpener_Expecter) Open(ctx interface{}, opt interface{}) *MockOpener_Open_Call {
return &MockOpener_Open_Call{Call: _e.mock.On("Open", ctx, opt)}
}
func (_c *MockOpener_Open_Call) Run(run func(ctx context.Context, opt *wal.OpenOption)) *MockOpener_Open_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*wal.OpenOption))
})
return _c
}
func (_c *MockOpener_Open_Call) Return(_a0 wal.WAL, _a1 error) *MockOpener_Open_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockOpener_Open_Call) RunAndReturn(run func(context.Context, *wal.OpenOption) (wal.WAL, error)) *MockOpener_Open_Call {
_c.Call.Return(run)
return _c
}
// NewMockOpener creates a new instance of MockOpener. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockOpener(t interface {
mock.TestingT
Cleanup(func())
}) *MockOpener {
mock := &MockOpener{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -0,0 +1,129 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
package mock_wal
import (
wal "github.com/milvus-io/milvus/internal/streamingnode/server/wal"
mock "github.com/stretchr/testify/mock"
)
// MockOpenerBuilder is an autogenerated mock type for the OpenerBuilder type
type MockOpenerBuilder struct {
mock.Mock
}
type MockOpenerBuilder_Expecter struct {
mock *mock.Mock
}
func (_m *MockOpenerBuilder) EXPECT() *MockOpenerBuilder_Expecter {
return &MockOpenerBuilder_Expecter{mock: &_m.Mock}
}
// Build provides a mock function with given fields:
func (_m *MockOpenerBuilder) Build() (wal.Opener, error) {
ret := _m.Called()
var r0 wal.Opener
var r1 error
if rf, ok := ret.Get(0).(func() (wal.Opener, error)); ok {
return rf()
}
if rf, ok := ret.Get(0).(func() wal.Opener); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(wal.Opener)
}
}
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockOpenerBuilder_Build_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Build'
type MockOpenerBuilder_Build_Call struct {
*mock.Call
}
// Build is a helper method to define mock.On call
func (_e *MockOpenerBuilder_Expecter) Build() *MockOpenerBuilder_Build_Call {
return &MockOpenerBuilder_Build_Call{Call: _e.mock.On("Build")}
}
func (_c *MockOpenerBuilder_Build_Call) Run(run func()) *MockOpenerBuilder_Build_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockOpenerBuilder_Build_Call) Return(_a0 wal.Opener, _a1 error) *MockOpenerBuilder_Build_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockOpenerBuilder_Build_Call) RunAndReturn(run func() (wal.Opener, error)) *MockOpenerBuilder_Build_Call {
_c.Call.Return(run)
return _c
}
// Name provides a mock function with given fields:
func (_m *MockOpenerBuilder) Name() string {
ret := _m.Called()
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// MockOpenerBuilder_Name_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Name'
type MockOpenerBuilder_Name_Call struct {
*mock.Call
}
// Name is a helper method to define mock.On call
func (_e *MockOpenerBuilder_Expecter) Name() *MockOpenerBuilder_Name_Call {
return &MockOpenerBuilder_Name_Call{Call: _e.mock.On("Name")}
}
func (_c *MockOpenerBuilder_Name_Call) Run(run func()) *MockOpenerBuilder_Name_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockOpenerBuilder_Name_Call) Return(_a0 string) *MockOpenerBuilder_Name_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockOpenerBuilder_Name_Call) RunAndReturn(run func() string) *MockOpenerBuilder_Name_Call {
_c.Call.Return(run)
return _c
}
// NewMockOpenerBuilder creates a new instance of MockOpenerBuilder. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockOpenerBuilder(t interface {
mock.TestingT
Cleanup(func())
}) *MockOpenerBuilder {
mock := &MockOpenerBuilder{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -0,0 +1,203 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
package mock_wal
import (
message "github.com/milvus-io/milvus/internal/util/streamingutil/message"
mock "github.com/stretchr/testify/mock"
)
// MockScanner is an autogenerated mock type for the Scanner type
type MockScanner struct {
mock.Mock
}
type MockScanner_Expecter struct {
mock *mock.Mock
}
func (_m *MockScanner) EXPECT() *MockScanner_Expecter {
return &MockScanner_Expecter{mock: &_m.Mock}
}
// Chan provides a mock function with given fields:
func (_m *MockScanner) Chan() <-chan message.ImmutableMessage {
ret := _m.Called()
var r0 <-chan message.ImmutableMessage
if rf, ok := ret.Get(0).(func() <-chan message.ImmutableMessage); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(<-chan message.ImmutableMessage)
}
}
return r0
}
// MockScanner_Chan_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Chan'
type MockScanner_Chan_Call struct {
*mock.Call
}
// Chan is a helper method to define mock.On call
func (_e *MockScanner_Expecter) Chan() *MockScanner_Chan_Call {
return &MockScanner_Chan_Call{Call: _e.mock.On("Chan")}
}
func (_c *MockScanner_Chan_Call) Run(run func()) *MockScanner_Chan_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockScanner_Chan_Call) Return(_a0 <-chan message.ImmutableMessage) *MockScanner_Chan_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockScanner_Chan_Call) RunAndReturn(run func() <-chan message.ImmutableMessage) *MockScanner_Chan_Call {
_c.Call.Return(run)
return _c
}
// Close provides a mock function with given fields:
func (_m *MockScanner) Close() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// MockScanner_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'
type MockScanner_Close_Call struct {
*mock.Call
}
// Close is a helper method to define mock.On call
func (_e *MockScanner_Expecter) Close() *MockScanner_Close_Call {
return &MockScanner_Close_Call{Call: _e.mock.On("Close")}
}
func (_c *MockScanner_Close_Call) Run(run func()) *MockScanner_Close_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockScanner_Close_Call) Return(_a0 error) *MockScanner_Close_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockScanner_Close_Call) RunAndReturn(run func() error) *MockScanner_Close_Call {
_c.Call.Return(run)
return _c
}
// Done provides a mock function with given fields:
func (_m *MockScanner) Done() <-chan struct{} {
ret := _m.Called()
var r0 <-chan struct{}
if rf, ok := ret.Get(0).(func() <-chan struct{}); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(<-chan struct{})
}
}
return r0
}
// MockScanner_Done_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Done'
type MockScanner_Done_Call struct {
*mock.Call
}
// Done is a helper method to define mock.On call
func (_e *MockScanner_Expecter) Done() *MockScanner_Done_Call {
return &MockScanner_Done_Call{Call: _e.mock.On("Done")}
}
func (_c *MockScanner_Done_Call) Run(run func()) *MockScanner_Done_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockScanner_Done_Call) Return(_a0 <-chan struct{}) *MockScanner_Done_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockScanner_Done_Call) RunAndReturn(run func() <-chan struct{}) *MockScanner_Done_Call {
_c.Call.Return(run)
return _c
}
// Error provides a mock function with given fields:
func (_m *MockScanner) Error() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// MockScanner_Error_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Error'
type MockScanner_Error_Call struct {
*mock.Call
}
// Error is a helper method to define mock.On call
func (_e *MockScanner_Expecter) Error() *MockScanner_Error_Call {
return &MockScanner_Error_Call{Call: _e.mock.On("Error")}
}
func (_c *MockScanner_Error_Call) Run(run func()) *MockScanner_Error_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockScanner_Error_Call) Return(_a0 error) *MockScanner_Error_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockScanner_Error_Call) RunAndReturn(run func() error) *MockScanner_Error_Call {
_c.Call.Return(run)
return _c
}
// NewMockScanner creates a new instance of MockScanner. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockScanner(t interface {
mock.TestingT
Cleanup(func())
}) *MockScanner {
mock := &MockScanner{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -0,0 +1,261 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
package mock_wal
import (
context "context"
message "github.com/milvus-io/milvus/internal/util/streamingutil/message"
mock "github.com/stretchr/testify/mock"
streamingpb "github.com/milvus-io/milvus/internal/proto/streamingpb"
wal "github.com/milvus-io/milvus/internal/streamingnode/server/wal"
)
// MockWAL is an autogenerated mock type for the WAL type
type MockWAL struct {
mock.Mock
}
type MockWAL_Expecter struct {
mock *mock.Mock
}
func (_m *MockWAL) EXPECT() *MockWAL_Expecter {
return &MockWAL_Expecter{mock: &_m.Mock}
}
// Append provides a mock function with given fields: ctx, msg
func (_m *MockWAL) Append(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) {
ret := _m.Called(ctx, msg)
var r0 message.MessageID
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage) (message.MessageID, error)); ok {
return rf(ctx, msg)
}
if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage) message.MessageID); ok {
r0 = rf(ctx, msg)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(message.MessageID)
}
}
if rf, ok := ret.Get(1).(func(context.Context, message.MutableMessage) error); ok {
r1 = rf(ctx, msg)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockWAL_Append_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Append'
type MockWAL_Append_Call struct {
*mock.Call
}
// Append is a helper method to define mock.On call
// - ctx context.Context
// - msg message.MutableMessage
func (_e *MockWAL_Expecter) Append(ctx interface{}, msg interface{}) *MockWAL_Append_Call {
return &MockWAL_Append_Call{Call: _e.mock.On("Append", ctx, msg)}
}
func (_c *MockWAL_Append_Call) Run(run func(ctx context.Context, msg message.MutableMessage)) *MockWAL_Append_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(message.MutableMessage))
})
return _c
}
func (_c *MockWAL_Append_Call) Return(_a0 message.MessageID, _a1 error) *MockWAL_Append_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockWAL_Append_Call) RunAndReturn(run func(context.Context, message.MutableMessage) (message.MessageID, error)) *MockWAL_Append_Call {
_c.Call.Return(run)
return _c
}
// AppendAsync provides a mock function with given fields: ctx, msg, cb
func (_m *MockWAL) AppendAsync(ctx context.Context, msg message.MutableMessage, cb func(message.MessageID, error)) {
_m.Called(ctx, msg, cb)
}
// MockWAL_AppendAsync_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AppendAsync'
type MockWAL_AppendAsync_Call struct {
*mock.Call
}
// AppendAsync is a helper method to define mock.On call
// - ctx context.Context
// - msg message.MutableMessage
// - cb func(message.MessageID , error)
func (_e *MockWAL_Expecter) AppendAsync(ctx interface{}, msg interface{}, cb interface{}) *MockWAL_AppendAsync_Call {
return &MockWAL_AppendAsync_Call{Call: _e.mock.On("AppendAsync", ctx, msg, cb)}
}
func (_c *MockWAL_AppendAsync_Call) Run(run func(ctx context.Context, msg message.MutableMessage, cb func(message.MessageID, error))) *MockWAL_AppendAsync_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(message.MutableMessage), args[2].(func(message.MessageID, error)))
})
return _c
}
func (_c *MockWAL_AppendAsync_Call) Return() *MockWAL_AppendAsync_Call {
_c.Call.Return()
return _c
}
func (_c *MockWAL_AppendAsync_Call) RunAndReturn(run func(context.Context, message.MutableMessage, func(message.MessageID, error))) *MockWAL_AppendAsync_Call {
_c.Call.Return(run)
return _c
}
// Channel provides a mock function with given fields:
func (_m *MockWAL) Channel() *streamingpb.PChannelInfo {
ret := _m.Called()
var r0 *streamingpb.PChannelInfo
if rf, ok := ret.Get(0).(func() *streamingpb.PChannelInfo); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*streamingpb.PChannelInfo)
}
}
return r0
}
// MockWAL_Channel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Channel'
type MockWAL_Channel_Call struct {
*mock.Call
}
// Channel is a helper method to define mock.On call
func (_e *MockWAL_Expecter) Channel() *MockWAL_Channel_Call {
return &MockWAL_Channel_Call{Call: _e.mock.On("Channel")}
}
func (_c *MockWAL_Channel_Call) Run(run func()) *MockWAL_Channel_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockWAL_Channel_Call) Return(_a0 *streamingpb.PChannelInfo) *MockWAL_Channel_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockWAL_Channel_Call) RunAndReturn(run func() *streamingpb.PChannelInfo) *MockWAL_Channel_Call {
_c.Call.Return(run)
return _c
}
// Close provides a mock function with given fields:
func (_m *MockWAL) Close() {
_m.Called()
}
// MockWAL_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'
type MockWAL_Close_Call struct {
*mock.Call
}
// Close is a helper method to define mock.On call
func (_e *MockWAL_Expecter) Close() *MockWAL_Close_Call {
return &MockWAL_Close_Call{Call: _e.mock.On("Close")}
}
func (_c *MockWAL_Close_Call) Run(run func()) *MockWAL_Close_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockWAL_Close_Call) Return() *MockWAL_Close_Call {
_c.Call.Return()
return _c
}
func (_c *MockWAL_Close_Call) RunAndReturn(run func()) *MockWAL_Close_Call {
_c.Call.Return(run)
return _c
}
// Read provides a mock function with given fields: ctx, deliverPolicy
func (_m *MockWAL) Read(ctx context.Context, deliverPolicy wal.ReadOption) (wal.Scanner, error) {
ret := _m.Called(ctx, deliverPolicy)
var r0 wal.Scanner
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, wal.ReadOption) (wal.Scanner, error)); ok {
return rf(ctx, deliverPolicy)
}
if rf, ok := ret.Get(0).(func(context.Context, wal.ReadOption) wal.Scanner); ok {
r0 = rf(ctx, deliverPolicy)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(wal.Scanner)
}
}
if rf, ok := ret.Get(1).(func(context.Context, wal.ReadOption) error); ok {
r1 = rf(ctx, deliverPolicy)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockWAL_Read_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Read'
type MockWAL_Read_Call struct {
*mock.Call
}
// Read is a helper method to define mock.On call
// - ctx context.Context
// - deliverPolicy wal.ReadOption
func (_e *MockWAL_Expecter) Read(ctx interface{}, deliverPolicy interface{}) *MockWAL_Read_Call {
return &MockWAL_Read_Call{Call: _e.mock.On("Read", ctx, deliverPolicy)}
}
func (_c *MockWAL_Read_Call) Run(run func(ctx context.Context, deliverPolicy wal.ReadOption)) *MockWAL_Read_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(wal.ReadOption))
})
return _c
}
func (_c *MockWAL_Read_Call) Return(_a0 wal.Scanner, _a1 error) *MockWAL_Read_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockWAL_Read_Call) RunAndReturn(run func(context.Context, wal.ReadOption) (wal.Scanner, error)) *MockWAL_Read_Call {
_c.Call.Return(run)
return _c
}
// NewMockWAL creates a new instance of MockWAL. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockWAL(t interface {
mock.TestingT
Cleanup(func())
}) *MockWAL {
mock := &MockWAL{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -0,0 +1,125 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
package mock_walimpls
import (
context "context"
message "github.com/milvus-io/milvus/internal/util/streamingutil/message"
mock "github.com/stretchr/testify/mock"
)
// MockInterceptor is an autogenerated mock type for the Interceptor type
type MockInterceptor struct {
mock.Mock
}
type MockInterceptor_Expecter struct {
mock *mock.Mock
}
func (_m *MockInterceptor) EXPECT() *MockInterceptor_Expecter {
return &MockInterceptor_Expecter{mock: &_m.Mock}
}
// Close provides a mock function with given fields:
func (_m *MockInterceptor) Close() {
_m.Called()
}
// MockInterceptor_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'
type MockInterceptor_Close_Call struct {
*mock.Call
}
// Close is a helper method to define mock.On call
func (_e *MockInterceptor_Expecter) Close() *MockInterceptor_Close_Call {
return &MockInterceptor_Close_Call{Call: _e.mock.On("Close")}
}
func (_c *MockInterceptor_Close_Call) Run(run func()) *MockInterceptor_Close_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockInterceptor_Close_Call) Return() *MockInterceptor_Close_Call {
_c.Call.Return()
return _c
}
func (_c *MockInterceptor_Close_Call) RunAndReturn(run func()) *MockInterceptor_Close_Call {
_c.Call.Return(run)
return _c
}
// DoAppend provides a mock function with given fields: ctx, msg, append
func (_m *MockInterceptor) DoAppend(ctx context.Context, msg message.MutableMessage, append func(context.Context, message.MutableMessage) (message.MessageID, error)) (message.MessageID, error) {
ret := _m.Called(ctx, msg, append)
var r0 message.MessageID
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage, func(context.Context, message.MutableMessage) (message.MessageID, error)) (message.MessageID, error)); ok {
return rf(ctx, msg, append)
}
if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage, func(context.Context, message.MutableMessage) (message.MessageID, error)) message.MessageID); ok {
r0 = rf(ctx, msg, append)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(message.MessageID)
}
}
if rf, ok := ret.Get(1).(func(context.Context, message.MutableMessage, func(context.Context, message.MutableMessage) (message.MessageID, error)) error); ok {
r1 = rf(ctx, msg, append)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockInterceptor_DoAppend_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DoAppend'
type MockInterceptor_DoAppend_Call struct {
*mock.Call
}
// DoAppend is a helper method to define mock.On call
// - ctx context.Context
// - msg message.MutableMessage
// - append func(context.Context , message.MutableMessage)(message.MessageID , error)
func (_e *MockInterceptor_Expecter) DoAppend(ctx interface{}, msg interface{}, append interface{}) *MockInterceptor_DoAppend_Call {
return &MockInterceptor_DoAppend_Call{Call: _e.mock.On("DoAppend", ctx, msg, append)}
}
func (_c *MockInterceptor_DoAppend_Call) Run(run func(ctx context.Context, msg message.MutableMessage, append func(context.Context, message.MutableMessage) (message.MessageID, error))) *MockInterceptor_DoAppend_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(message.MutableMessage), args[2].(func(context.Context, message.MutableMessage) (message.MessageID, error)))
})
return _c
}
func (_c *MockInterceptor_DoAppend_Call) Return(_a0 message.MessageID, _a1 error) *MockInterceptor_DoAppend_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockInterceptor_DoAppend_Call) RunAndReturn(run func(context.Context, message.MutableMessage, func(context.Context, message.MutableMessage) (message.MessageID, error)) (message.MessageID, error)) *MockInterceptor_DoAppend_Call {
_c.Call.Return(run)
return _c
}
// NewMockInterceptor creates a new instance of MockInterceptor. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockInterceptor(t interface {
mock.TestingT
Cleanup(func())
}) *MockInterceptor {
mock := &MockInterceptor{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -0,0 +1,79 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
package mock_walimpls
import (
walimpls "github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls"
mock "github.com/stretchr/testify/mock"
)
// MockInterceptorBuilder is an autogenerated mock type for the InterceptorBuilder type
type MockInterceptorBuilder struct {
mock.Mock
}
type MockInterceptorBuilder_Expecter struct {
mock *mock.Mock
}
func (_m *MockInterceptorBuilder) EXPECT() *MockInterceptorBuilder_Expecter {
return &MockInterceptorBuilder_Expecter{mock: &_m.Mock}
}
// Build provides a mock function with given fields: wal
func (_m *MockInterceptorBuilder) Build(wal <-chan walimpls.WALImpls) walimpls.BasicInterceptor {
ret := _m.Called(wal)
var r0 walimpls.BasicInterceptor
if rf, ok := ret.Get(0).(func(<-chan walimpls.WALImpls) walimpls.BasicInterceptor); ok {
r0 = rf(wal)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(walimpls.BasicInterceptor)
}
}
return r0
}
// MockInterceptorBuilder_Build_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Build'
type MockInterceptorBuilder_Build_Call struct {
*mock.Call
}
// Build is a helper method to define mock.On call
// - wal <-chan walimpls.WALImpls
func (_e *MockInterceptorBuilder_Expecter) Build(wal interface{}) *MockInterceptorBuilder_Build_Call {
return &MockInterceptorBuilder_Build_Call{Call: _e.mock.On("Build", wal)}
}
func (_c *MockInterceptorBuilder_Build_Call) Run(run func(wal <-chan walimpls.WALImpls)) *MockInterceptorBuilder_Build_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(<-chan walimpls.WALImpls))
})
return _c
}
func (_c *MockInterceptorBuilder_Build_Call) Return(_a0 walimpls.BasicInterceptor) *MockInterceptorBuilder_Build_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockInterceptorBuilder_Build_Call) RunAndReturn(run func(<-chan walimpls.WALImpls) walimpls.BasicInterceptor) *MockInterceptorBuilder_Build_Call {
_c.Call.Return(run)
return _c
}
// NewMockInterceptorBuilder creates a new instance of MockInterceptorBuilder. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockInterceptorBuilder(t interface {
mock.TestingT
Cleanup(func())
}) *MockInterceptorBuilder {
mock := &MockInterceptorBuilder{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -0,0 +1,168 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
package mock_walimpls
import (
context "context"
message "github.com/milvus-io/milvus/internal/util/streamingutil/message"
mock "github.com/stretchr/testify/mock"
)
// MockInterceptorWithReady is an autogenerated mock type for the InterceptorWithReady type
type MockInterceptorWithReady struct {
mock.Mock
}
type MockInterceptorWithReady_Expecter struct {
mock *mock.Mock
}
func (_m *MockInterceptorWithReady) EXPECT() *MockInterceptorWithReady_Expecter {
return &MockInterceptorWithReady_Expecter{mock: &_m.Mock}
}
// Close provides a mock function with given fields:
func (_m *MockInterceptorWithReady) Close() {
_m.Called()
}
// MockInterceptorWithReady_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'
type MockInterceptorWithReady_Close_Call struct {
*mock.Call
}
// Close is a helper method to define mock.On call
func (_e *MockInterceptorWithReady_Expecter) Close() *MockInterceptorWithReady_Close_Call {
return &MockInterceptorWithReady_Close_Call{Call: _e.mock.On("Close")}
}
func (_c *MockInterceptorWithReady_Close_Call) Run(run func()) *MockInterceptorWithReady_Close_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockInterceptorWithReady_Close_Call) Return() *MockInterceptorWithReady_Close_Call {
_c.Call.Return()
return _c
}
func (_c *MockInterceptorWithReady_Close_Call) RunAndReturn(run func()) *MockInterceptorWithReady_Close_Call {
_c.Call.Return(run)
return _c
}
// DoAppend provides a mock function with given fields: ctx, msg, append
func (_m *MockInterceptorWithReady) DoAppend(ctx context.Context, msg message.MutableMessage, append func(context.Context, message.MutableMessage) (message.MessageID, error)) (message.MessageID, error) {
ret := _m.Called(ctx, msg, append)
var r0 message.MessageID
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage, func(context.Context, message.MutableMessage) (message.MessageID, error)) (message.MessageID, error)); ok {
return rf(ctx, msg, append)
}
if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage, func(context.Context, message.MutableMessage) (message.MessageID, error)) message.MessageID); ok {
r0 = rf(ctx, msg, append)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(message.MessageID)
}
}
if rf, ok := ret.Get(1).(func(context.Context, message.MutableMessage, func(context.Context, message.MutableMessage) (message.MessageID, error)) error); ok {
r1 = rf(ctx, msg, append)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockInterceptorWithReady_DoAppend_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DoAppend'
type MockInterceptorWithReady_DoAppend_Call struct {
*mock.Call
}
// DoAppend is a helper method to define mock.On call
// - ctx context.Context
// - msg message.MutableMessage
// - append func(context.Context , message.MutableMessage)(message.MessageID , error)
func (_e *MockInterceptorWithReady_Expecter) DoAppend(ctx interface{}, msg interface{}, append interface{}) *MockInterceptorWithReady_DoAppend_Call {
return &MockInterceptorWithReady_DoAppend_Call{Call: _e.mock.On("DoAppend", ctx, msg, append)}
}
func (_c *MockInterceptorWithReady_DoAppend_Call) Run(run func(ctx context.Context, msg message.MutableMessage, append func(context.Context, message.MutableMessage) (message.MessageID, error))) *MockInterceptorWithReady_DoAppend_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(message.MutableMessage), args[2].(func(context.Context, message.MutableMessage) (message.MessageID, error)))
})
return _c
}
func (_c *MockInterceptorWithReady_DoAppend_Call) Return(_a0 message.MessageID, _a1 error) *MockInterceptorWithReady_DoAppend_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockInterceptorWithReady_DoAppend_Call) RunAndReturn(run func(context.Context, message.MutableMessage, func(context.Context, message.MutableMessage) (message.MessageID, error)) (message.MessageID, error)) *MockInterceptorWithReady_DoAppend_Call {
_c.Call.Return(run)
return _c
}
// Ready provides a mock function with given fields:
func (_m *MockInterceptorWithReady) Ready() <-chan struct{} {
ret := _m.Called()
var r0 <-chan struct{}
if rf, ok := ret.Get(0).(func() <-chan struct{}); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(<-chan struct{})
}
}
return r0
}
// MockInterceptorWithReady_Ready_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Ready'
type MockInterceptorWithReady_Ready_Call struct {
*mock.Call
}
// Ready is a helper method to define mock.On call
func (_e *MockInterceptorWithReady_Expecter) Ready() *MockInterceptorWithReady_Ready_Call {
return &MockInterceptorWithReady_Ready_Call{Call: _e.mock.On("Ready")}
}
func (_c *MockInterceptorWithReady_Ready_Call) Run(run func()) *MockInterceptorWithReady_Ready_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockInterceptorWithReady_Ready_Call) Return(_a0 <-chan struct{}) *MockInterceptorWithReady_Ready_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockInterceptorWithReady_Ready_Call) RunAndReturn(run func() <-chan struct{}) *MockInterceptorWithReady_Ready_Call {
_c.Call.Return(run)
return _c
}
// NewMockInterceptorWithReady creates a new instance of MockInterceptorWithReady. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockInterceptorWithReady(t interface {
mock.TestingT
Cleanup(func())
}) *MockInterceptorWithReady {
mock := &MockInterceptorWithReady{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -0,0 +1,129 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
package mock_walimpls
import (
walimpls "github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls"
mock "github.com/stretchr/testify/mock"
)
// MockOpenerBuilderImpls is an autogenerated mock type for the OpenerBuilderImpls type
type MockOpenerBuilderImpls struct {
mock.Mock
}
type MockOpenerBuilderImpls_Expecter struct {
mock *mock.Mock
}
func (_m *MockOpenerBuilderImpls) EXPECT() *MockOpenerBuilderImpls_Expecter {
return &MockOpenerBuilderImpls_Expecter{mock: &_m.Mock}
}
// Build provides a mock function with given fields:
func (_m *MockOpenerBuilderImpls) Build() (walimpls.OpenerImpls, error) {
ret := _m.Called()
var r0 walimpls.OpenerImpls
var r1 error
if rf, ok := ret.Get(0).(func() (walimpls.OpenerImpls, error)); ok {
return rf()
}
if rf, ok := ret.Get(0).(func() walimpls.OpenerImpls); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(walimpls.OpenerImpls)
}
}
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockOpenerBuilderImpls_Build_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Build'
type MockOpenerBuilderImpls_Build_Call struct {
*mock.Call
}
// Build is a helper method to define mock.On call
func (_e *MockOpenerBuilderImpls_Expecter) Build() *MockOpenerBuilderImpls_Build_Call {
return &MockOpenerBuilderImpls_Build_Call{Call: _e.mock.On("Build")}
}
func (_c *MockOpenerBuilderImpls_Build_Call) Run(run func()) *MockOpenerBuilderImpls_Build_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockOpenerBuilderImpls_Build_Call) Return(_a0 walimpls.OpenerImpls, _a1 error) *MockOpenerBuilderImpls_Build_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockOpenerBuilderImpls_Build_Call) RunAndReturn(run func() (walimpls.OpenerImpls, error)) *MockOpenerBuilderImpls_Build_Call {
_c.Call.Return(run)
return _c
}
// Name provides a mock function with given fields:
func (_m *MockOpenerBuilderImpls) Name() string {
ret := _m.Called()
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// MockOpenerBuilderImpls_Name_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Name'
type MockOpenerBuilderImpls_Name_Call struct {
*mock.Call
}
// Name is a helper method to define mock.On call
func (_e *MockOpenerBuilderImpls_Expecter) Name() *MockOpenerBuilderImpls_Name_Call {
return &MockOpenerBuilderImpls_Name_Call{Call: _e.mock.On("Name")}
}
func (_c *MockOpenerBuilderImpls_Name_Call) Run(run func()) *MockOpenerBuilderImpls_Name_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockOpenerBuilderImpls_Name_Call) Return(_a0 string) *MockOpenerBuilderImpls_Name_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockOpenerBuilderImpls_Name_Call) RunAndReturn(run func() string) *MockOpenerBuilderImpls_Name_Call {
_c.Call.Return(run)
return _c
}
// NewMockOpenerBuilderImpls creates a new instance of MockOpenerBuilderImpls. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockOpenerBuilderImpls(t interface {
mock.TestingT
Cleanup(func())
}) *MockOpenerBuilderImpls {
mock := &MockOpenerBuilderImpls{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -0,0 +1,124 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
package mock_walimpls
import (
context "context"
walimpls "github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls"
mock "github.com/stretchr/testify/mock"
)
// MockOpenerImpls is an autogenerated mock type for the OpenerImpls type
type MockOpenerImpls struct {
mock.Mock
}
type MockOpenerImpls_Expecter struct {
mock *mock.Mock
}
func (_m *MockOpenerImpls) EXPECT() *MockOpenerImpls_Expecter {
return &MockOpenerImpls_Expecter{mock: &_m.Mock}
}
// Close provides a mock function with given fields:
func (_m *MockOpenerImpls) Close() {
_m.Called()
}
// MockOpenerImpls_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'
type MockOpenerImpls_Close_Call struct {
*mock.Call
}
// Close is a helper method to define mock.On call
func (_e *MockOpenerImpls_Expecter) Close() *MockOpenerImpls_Close_Call {
return &MockOpenerImpls_Close_Call{Call: _e.mock.On("Close")}
}
func (_c *MockOpenerImpls_Close_Call) Run(run func()) *MockOpenerImpls_Close_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockOpenerImpls_Close_Call) Return() *MockOpenerImpls_Close_Call {
_c.Call.Return()
return _c
}
func (_c *MockOpenerImpls_Close_Call) RunAndReturn(run func()) *MockOpenerImpls_Close_Call {
_c.Call.Return(run)
return _c
}
// Open provides a mock function with given fields: ctx, opt
func (_m *MockOpenerImpls) Open(ctx context.Context, opt *walimpls.OpenOption) (walimpls.WALImpls, error) {
ret := _m.Called(ctx, opt)
var r0 walimpls.WALImpls
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *walimpls.OpenOption) (walimpls.WALImpls, error)); ok {
return rf(ctx, opt)
}
if rf, ok := ret.Get(0).(func(context.Context, *walimpls.OpenOption) walimpls.WALImpls); ok {
r0 = rf(ctx, opt)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(walimpls.WALImpls)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *walimpls.OpenOption) error); ok {
r1 = rf(ctx, opt)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockOpenerImpls_Open_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Open'
type MockOpenerImpls_Open_Call struct {
*mock.Call
}
// Open is a helper method to define mock.On call
// - ctx context.Context
// - opt *walimpls.OpenOption
func (_e *MockOpenerImpls_Expecter) Open(ctx interface{}, opt interface{}) *MockOpenerImpls_Open_Call {
return &MockOpenerImpls_Open_Call{Call: _e.mock.On("Open", ctx, opt)}
}
func (_c *MockOpenerImpls_Open_Call) Run(run func(ctx context.Context, opt *walimpls.OpenOption)) *MockOpenerImpls_Open_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*walimpls.OpenOption))
})
return _c
}
func (_c *MockOpenerImpls_Open_Call) Return(_a0 walimpls.WALImpls, _a1 error) *MockOpenerImpls_Open_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockOpenerImpls_Open_Call) RunAndReturn(run func(context.Context, *walimpls.OpenOption) (walimpls.WALImpls, error)) *MockOpenerImpls_Open_Call {
_c.Call.Return(run)
return _c
}
// NewMockOpenerImpls creates a new instance of MockOpenerImpls. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockOpenerImpls(t interface {
mock.TestingT
Cleanup(func())
}) *MockOpenerImpls {
mock := &MockOpenerImpls{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -0,0 +1,244 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
package mock_walimpls
import (
message "github.com/milvus-io/milvus/internal/util/streamingutil/message"
mock "github.com/stretchr/testify/mock"
)
// MockScannerImpls is an autogenerated mock type for the ScannerImpls type
type MockScannerImpls struct {
mock.Mock
}
type MockScannerImpls_Expecter struct {
mock *mock.Mock
}
func (_m *MockScannerImpls) EXPECT() *MockScannerImpls_Expecter {
return &MockScannerImpls_Expecter{mock: &_m.Mock}
}
// Chan provides a mock function with given fields:
func (_m *MockScannerImpls) Chan() <-chan message.ImmutableMessage {
ret := _m.Called()
var r0 <-chan message.ImmutableMessage
if rf, ok := ret.Get(0).(func() <-chan message.ImmutableMessage); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(<-chan message.ImmutableMessage)
}
}
return r0
}
// MockScannerImpls_Chan_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Chan'
type MockScannerImpls_Chan_Call struct {
*mock.Call
}
// Chan is a helper method to define mock.On call
func (_e *MockScannerImpls_Expecter) Chan() *MockScannerImpls_Chan_Call {
return &MockScannerImpls_Chan_Call{Call: _e.mock.On("Chan")}
}
func (_c *MockScannerImpls_Chan_Call) Run(run func()) *MockScannerImpls_Chan_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockScannerImpls_Chan_Call) Return(_a0 <-chan message.ImmutableMessage) *MockScannerImpls_Chan_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockScannerImpls_Chan_Call) RunAndReturn(run func() <-chan message.ImmutableMessage) *MockScannerImpls_Chan_Call {
_c.Call.Return(run)
return _c
}
// Close provides a mock function with given fields:
func (_m *MockScannerImpls) Close() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// MockScannerImpls_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'
type MockScannerImpls_Close_Call struct {
*mock.Call
}
// Close is a helper method to define mock.On call
func (_e *MockScannerImpls_Expecter) Close() *MockScannerImpls_Close_Call {
return &MockScannerImpls_Close_Call{Call: _e.mock.On("Close")}
}
func (_c *MockScannerImpls_Close_Call) Run(run func()) *MockScannerImpls_Close_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockScannerImpls_Close_Call) Return(_a0 error) *MockScannerImpls_Close_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockScannerImpls_Close_Call) RunAndReturn(run func() error) *MockScannerImpls_Close_Call {
_c.Call.Return(run)
return _c
}
// Done provides a mock function with given fields:
func (_m *MockScannerImpls) Done() <-chan struct{} {
ret := _m.Called()
var r0 <-chan struct{}
if rf, ok := ret.Get(0).(func() <-chan struct{}); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(<-chan struct{})
}
}
return r0
}
// MockScannerImpls_Done_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Done'
type MockScannerImpls_Done_Call struct {
*mock.Call
}
// Done is a helper method to define mock.On call
func (_e *MockScannerImpls_Expecter) Done() *MockScannerImpls_Done_Call {
return &MockScannerImpls_Done_Call{Call: _e.mock.On("Done")}
}
func (_c *MockScannerImpls_Done_Call) Run(run func()) *MockScannerImpls_Done_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockScannerImpls_Done_Call) Return(_a0 <-chan struct{}) *MockScannerImpls_Done_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockScannerImpls_Done_Call) RunAndReturn(run func() <-chan struct{}) *MockScannerImpls_Done_Call {
_c.Call.Return(run)
return _c
}
// Error provides a mock function with given fields:
func (_m *MockScannerImpls) Error() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// MockScannerImpls_Error_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Error'
type MockScannerImpls_Error_Call struct {
*mock.Call
}
// Error is a helper method to define mock.On call
func (_e *MockScannerImpls_Expecter) Error() *MockScannerImpls_Error_Call {
return &MockScannerImpls_Error_Call{Call: _e.mock.On("Error")}
}
func (_c *MockScannerImpls_Error_Call) Run(run func()) *MockScannerImpls_Error_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockScannerImpls_Error_Call) Return(_a0 error) *MockScannerImpls_Error_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockScannerImpls_Error_Call) RunAndReturn(run func() error) *MockScannerImpls_Error_Call {
_c.Call.Return(run)
return _c
}
// Name provides a mock function with given fields:
func (_m *MockScannerImpls) Name() string {
ret := _m.Called()
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// MockScannerImpls_Name_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Name'
type MockScannerImpls_Name_Call struct {
*mock.Call
}
// Name is a helper method to define mock.On call
func (_e *MockScannerImpls_Expecter) Name() *MockScannerImpls_Name_Call {
return &MockScannerImpls_Name_Call{Call: _e.mock.On("Name")}
}
func (_c *MockScannerImpls_Name_Call) Run(run func()) *MockScannerImpls_Name_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockScannerImpls_Name_Call) Return(_a0 string) *MockScannerImpls_Name_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockScannerImpls_Name_Call) RunAndReturn(run func() string) *MockScannerImpls_Name_Call {
_c.Call.Return(run)
return _c
}
// NewMockScannerImpls creates a new instance of MockScannerImpls. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockScannerImpls(t interface {
mock.TestingT
Cleanup(func())
}) *MockScannerImpls {
mock := &MockScannerImpls{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -0,0 +1,226 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
package mock_walimpls
import (
context "context"
message "github.com/milvus-io/milvus/internal/util/streamingutil/message"
mock "github.com/stretchr/testify/mock"
streamingpb "github.com/milvus-io/milvus/internal/proto/streamingpb"
walimpls "github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls"
)
// MockWALImpls is an autogenerated mock type for the WALImpls type
type MockWALImpls struct {
mock.Mock
}
type MockWALImpls_Expecter struct {
mock *mock.Mock
}
func (_m *MockWALImpls) EXPECT() *MockWALImpls_Expecter {
return &MockWALImpls_Expecter{mock: &_m.Mock}
}
// Append provides a mock function with given fields: ctx, msg
func (_m *MockWALImpls) Append(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) {
ret := _m.Called(ctx, msg)
var r0 message.MessageID
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage) (message.MessageID, error)); ok {
return rf(ctx, msg)
}
if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage) message.MessageID); ok {
r0 = rf(ctx, msg)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(message.MessageID)
}
}
if rf, ok := ret.Get(1).(func(context.Context, message.MutableMessage) error); ok {
r1 = rf(ctx, msg)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockWALImpls_Append_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Append'
type MockWALImpls_Append_Call struct {
*mock.Call
}
// Append is a helper method to define mock.On call
// - ctx context.Context
// - msg message.MutableMessage
func (_e *MockWALImpls_Expecter) Append(ctx interface{}, msg interface{}) *MockWALImpls_Append_Call {
return &MockWALImpls_Append_Call{Call: _e.mock.On("Append", ctx, msg)}
}
func (_c *MockWALImpls_Append_Call) Run(run func(ctx context.Context, msg message.MutableMessage)) *MockWALImpls_Append_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(message.MutableMessage))
})
return _c
}
func (_c *MockWALImpls_Append_Call) Return(_a0 message.MessageID, _a1 error) *MockWALImpls_Append_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockWALImpls_Append_Call) RunAndReturn(run func(context.Context, message.MutableMessage) (message.MessageID, error)) *MockWALImpls_Append_Call {
_c.Call.Return(run)
return _c
}
// Channel provides a mock function with given fields:
func (_m *MockWALImpls) Channel() *streamingpb.PChannelInfo {
ret := _m.Called()
var r0 *streamingpb.PChannelInfo
if rf, ok := ret.Get(0).(func() *streamingpb.PChannelInfo); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*streamingpb.PChannelInfo)
}
}
return r0
}
// MockWALImpls_Channel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Channel'
type MockWALImpls_Channel_Call struct {
*mock.Call
}
// Channel is a helper method to define mock.On call
func (_e *MockWALImpls_Expecter) Channel() *MockWALImpls_Channel_Call {
return &MockWALImpls_Channel_Call{Call: _e.mock.On("Channel")}
}
func (_c *MockWALImpls_Channel_Call) Run(run func()) *MockWALImpls_Channel_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockWALImpls_Channel_Call) Return(_a0 *streamingpb.PChannelInfo) *MockWALImpls_Channel_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockWALImpls_Channel_Call) RunAndReturn(run func() *streamingpb.PChannelInfo) *MockWALImpls_Channel_Call {
_c.Call.Return(run)
return _c
}
// Close provides a mock function with given fields:
func (_m *MockWALImpls) Close() {
_m.Called()
}
// MockWALImpls_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'
type MockWALImpls_Close_Call struct {
*mock.Call
}
// Close is a helper method to define mock.On call
func (_e *MockWALImpls_Expecter) Close() *MockWALImpls_Close_Call {
return &MockWALImpls_Close_Call{Call: _e.mock.On("Close")}
}
func (_c *MockWALImpls_Close_Call) Run(run func()) *MockWALImpls_Close_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockWALImpls_Close_Call) Return() *MockWALImpls_Close_Call {
_c.Call.Return()
return _c
}
func (_c *MockWALImpls_Close_Call) RunAndReturn(run func()) *MockWALImpls_Close_Call {
_c.Call.Return(run)
return _c
}
// Read provides a mock function with given fields: ctx, opts
func (_m *MockWALImpls) Read(ctx context.Context, opts walimpls.ReadOption) (walimpls.ScannerImpls, error) {
ret := _m.Called(ctx, opts)
var r0 walimpls.ScannerImpls
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, walimpls.ReadOption) (walimpls.ScannerImpls, error)); ok {
return rf(ctx, opts)
}
if rf, ok := ret.Get(0).(func(context.Context, walimpls.ReadOption) walimpls.ScannerImpls); ok {
r0 = rf(ctx, opts)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(walimpls.ScannerImpls)
}
}
if rf, ok := ret.Get(1).(func(context.Context, walimpls.ReadOption) error); ok {
r1 = rf(ctx, opts)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockWALImpls_Read_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Read'
type MockWALImpls_Read_Call struct {
*mock.Call
}
// Read is a helper method to define mock.On call
// - ctx context.Context
// - opts walimpls.ReadOption
func (_e *MockWALImpls_Expecter) Read(ctx interface{}, opts interface{}) *MockWALImpls_Read_Call {
return &MockWALImpls_Read_Call{Call: _e.mock.On("Read", ctx, opts)}
}
func (_c *MockWALImpls_Read_Call) Run(run func(ctx context.Context, opts walimpls.ReadOption)) *MockWALImpls_Read_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(walimpls.ReadOption))
})
return _c
}
func (_c *MockWALImpls_Read_Call) Return(_a0 walimpls.ScannerImpls, _a1 error) *MockWALImpls_Read_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockWALImpls_Read_Call) RunAndReturn(run func(context.Context, walimpls.ReadOption) (walimpls.ScannerImpls, error)) *MockWALImpls_Read_Call {
_c.Call.Return(run)
return _c
}
// NewMockWALImpls creates a new instance of MockWALImpls. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockWALImpls(t interface {
mock.TestingT
Cleanup(func())
}) *MockWALImpls {
mock := &MockWALImpls{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -3,7 +3,7 @@
package mock_message
import (
message "github.com/milvus-io/milvus/internal/util/logserviceutil/message"
message "github.com/milvus-io/milvus/internal/util/streamingutil/message"
mock "github.com/stretchr/testify/mock"
)

View File

@ -3,7 +3,7 @@
package mock_message
import (
message "github.com/milvus-io/milvus/internal/util/logserviceutil/message"
message "github.com/milvus-io/milvus/internal/util/streamingutil/message"
mock "github.com/stretchr/testify/mock"
)

View File

@ -3,7 +3,7 @@
package mock_message
import (
message "github.com/milvus-io/milvus/internal/util/logserviceutil/message"
message "github.com/milvus-io/milvus/internal/util/streamingutil/message"
mock "github.com/stretchr/testify/mock"
)
@ -61,6 +61,50 @@ func (_c *MockMutableMessage_EstimateSize_Call) RunAndReturn(run func() int) *Mo
return _c
}
// IntoImmutableMessage provides a mock function with given fields: msgID
func (_m *MockMutableMessage) IntoImmutableMessage(msgID message.MessageID) message.ImmutableMessage {
ret := _m.Called(msgID)
var r0 message.ImmutableMessage
if rf, ok := ret.Get(0).(func(message.MessageID) message.ImmutableMessage); ok {
r0 = rf(msgID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(message.ImmutableMessage)
}
}
return r0
}
// MockMutableMessage_IntoImmutableMessage_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IntoImmutableMessage'
type MockMutableMessage_IntoImmutableMessage_Call struct {
*mock.Call
}
// IntoImmutableMessage is a helper method to define mock.On call
// - msgID message.MessageID
func (_e *MockMutableMessage_Expecter) IntoImmutableMessage(msgID interface{}) *MockMutableMessage_IntoImmutableMessage_Call {
return &MockMutableMessage_IntoImmutableMessage_Call{Call: _e.mock.On("IntoImmutableMessage", msgID)}
}
func (_c *MockMutableMessage_IntoImmutableMessage_Call) Run(run func(msgID message.MessageID)) *MockMutableMessage_IntoImmutableMessage_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(message.MessageID))
})
return _c
}
func (_c *MockMutableMessage_IntoImmutableMessage_Call) Return(_a0 message.ImmutableMessage) *MockMutableMessage_IntoImmutableMessage_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockMutableMessage_IntoImmutableMessage_Call) RunAndReturn(run func(message.MessageID) message.ImmutableMessage) *MockMutableMessage_IntoImmutableMessage_Call {
_c.Call.Return(run)
return _c
}
// MessageType provides a mock function with given fields:
func (_m *MockMutableMessage) MessageType() message.MessageType {
ret := _m.Called()
@ -188,6 +232,50 @@ func (_c *MockMutableMessage_Properties_Call) RunAndReturn(run func() message.Pr
return _c
}
// WithLastConfirmed provides a mock function with given fields: id
func (_m *MockMutableMessage) WithLastConfirmed(id message.MessageID) message.MutableMessage {
ret := _m.Called(id)
var r0 message.MutableMessage
if rf, ok := ret.Get(0).(func(message.MessageID) message.MutableMessage); ok {
r0 = rf(id)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(message.MutableMessage)
}
}
return r0
}
// MockMutableMessage_WithLastConfirmed_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WithLastConfirmed'
type MockMutableMessage_WithLastConfirmed_Call struct {
*mock.Call
}
// WithLastConfirmed is a helper method to define mock.On call
// - id message.MessageID
func (_e *MockMutableMessage_Expecter) WithLastConfirmed(id interface{}) *MockMutableMessage_WithLastConfirmed_Call {
return &MockMutableMessage_WithLastConfirmed_Call{Call: _e.mock.On("WithLastConfirmed", id)}
}
func (_c *MockMutableMessage_WithLastConfirmed_Call) Run(run func(id message.MessageID)) *MockMutableMessage_WithLastConfirmed_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(message.MessageID))
})
return _c
}
func (_c *MockMutableMessage_WithLastConfirmed_Call) Return(_a0 message.MutableMessage) *MockMutableMessage_WithLastConfirmed_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockMutableMessage_WithLastConfirmed_Call) RunAndReturn(run func(message.MessageID) message.MutableMessage) *MockMutableMessage_WithLastConfirmed_Call {
_c.Call.Return(run)
return _c
}
// WithTimeTick provides a mock function with given fields: tt
func (_m *MockMutableMessage) WithTimeTick(tt uint64) message.MutableMessage {
ret := _m.Called(tt)

View File

@ -1,18 +0,0 @@
syntax = "proto3";
package milvus.proto.log;
option go_package = "github.com/milvus-io/milvus/internal/proto/logpb";
import "milvus.proto";
import "google/protobuf/empty.proto";
//
// Common
//
// Message is the basic unit of communication between publisher and consumer.
message Message {
bytes payload = 1; // message body
map<string, string> properties = 2; // message properties
}

View File

@ -0,0 +1,45 @@
syntax = "proto3";
package milvus.proto.log;
option go_package = "github.com/milvus-io/milvus/internal/proto/streamingpb";
import "milvus.proto";
import "google/protobuf/empty.proto";
//
// Common
//
// MessageID is the unique identifier of a message.
message MessageID {
bytes id = 1;
}
// Message is the basic unit of communication between publisher and consumer.
message Message {
bytes payload = 1; // message body
map<string, string> properties = 2; // message properties
}
// PChannelInfo is the information of a pchannel info.
message PChannelInfo {
string name = 1; // channel name
int64 term = 2; // A monotonic increasing term, every time the channel is recovered or moved to another streamingnode, the term will increase by meta server.
int64 serverID = 3; // The log node id address of the channel.
repeated VChannelInfo vChannelInfos = 4; // PChannel related vchannels.
}
// VChannelInfo is the information of a vchannel info.
message VChannelInfo {
string name = 1;
}
message DeliverPolicy {
oneof policy {
google.protobuf.Empty all = 1; // deliver all messages.
google.protobuf.Empty latest = 2; // deliver the latest message.
MessageID startFrom = 3; // deliver message from this message id. [startFrom, ...]
MessageID startAfter = 4; // deliver message after this message id. (startAfter, ...]
}
}

View File

@ -0,0 +1,42 @@
package streamingpb
import (
"google.golang.org/protobuf/types/known/emptypb"
)
const (
ServiceMethodPrefix = "/milvus.proto.log"
InitialTerm = int64(-1)
)
func NewDeliverAll() *DeliverPolicy {
return &DeliverPolicy{
Policy: &DeliverPolicy_All{
All: &emptypb.Empty{},
},
}
}
func NewDeliverLatest() *DeliverPolicy {
return &DeliverPolicy{
Policy: &DeliverPolicy_Latest{
Latest: &emptypb.Empty{},
},
}
}
func NewDeliverStartFrom(messageID *MessageID) *DeliverPolicy {
return &DeliverPolicy{
Policy: &DeliverPolicy_StartFrom{
StartFrom: messageID,
},
}
}
func NewDeliverStartAfter(messageID *MessageID) *DeliverPolicy {
return &DeliverPolicy{
Policy: &DeliverPolicy_StartAfter{
StartAfter: messageID,
},
}
}

View File

@ -0,0 +1,66 @@
# WAL
`wal` package is the basic defination of wal interface of milvus streamingnode.
## Project arrangement
- `/`: only define exposed interfaces.
- `/walimpls/`: define the underlying message system interfaces need to be implemented.
- `/registry/`: A static lifetime registry to regsiter new implementation for inverting dependency.
- `/adaptor/`: adaptors to implement `wal` interface from `walimpls` interface
- `/helper/`: A utility used to help developer to implement `walimpls` conveniently.
- `/utility/`: A utility code for common logic or data structure.
## Lifetime Of Interfaces
- `OpenerBuilder` has a static lifetime in a programs:
- `Opener` keep same lifetime with underlying resources (such as mq client).
- `WAL` keep same lifetime with underlying writer of wal, and it's lifetime is always included in related `Opener`.
- `Scanner` keep same lifetime with underlying reader of wal, and it's lifetime is always included in related `WAL`.
## Add New Implemetation Of WAL
developper who want to add a new implementation of `wal` should implements the `walimpls` package interfaces. following interfaces is required:
- `walimpls.OpenerBuilderImpls`
- `walimpls.OpenerImpls`
- `walimpls.ScannerImpls`
- `walimpls.WALImpls`
`OpenerBuilderImpls` create `OpenerImpls`; `OpenerImpls` creates `WALImpls`; `WALImpls` create `ScannerImpls`.
Then register the implmentation of `walimpls.OpenerBuilderImpls` into `registry` package.
```
var _ OpenerBuilderImpls = b{};
registry.RegisterBuilder(b{})
```
All things have been done.
## Use WAL
```
name := "your builder name"
var yourCh *streamingpb.PChannelInfo
opener, err := registry.MustGetBuilder(name).Build()
if err != nil {
panic(err)
}
ctx := context.Background()
logger, err := opener.Open(ctx, wal.OpenOption{
Channel: yourCh
})
if err != nil {
panic(err)
}
```
## Adaptor
package `adaptor` is used to adapt `walimpls` and `wal` together.
common wal function should be implement by it. Such as:
- lifetime management
- interceptor implementation
- scanner wrapped up
- write ahead cache implementation

View File

@ -0,0 +1,32 @@
package adaptor
import (
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls"
)
var _ wal.OpenerBuilder = (*builderAdaptorImpl)(nil)
func AdaptImplsToBuilder(builder walimpls.OpenerBuilderImpls) wal.OpenerBuilder {
return builderAdaptorImpl{
builder: builder,
}
}
type builderAdaptorImpl struct {
builder walimpls.OpenerBuilderImpls
}
func (b builderAdaptorImpl) Name() string {
return b.builder.Name()
}
func (b builderAdaptorImpl) Build() (wal.Opener, error) {
_, err := b.builder.Build()
if err != nil {
return nil, err
}
return nil, nil
// TODO: wait for implementation.
// return adaptImplsToOpener(o), nil
}

View File

@ -0,0 +1,31 @@
package wal
import (
"context"
"github.com/milvus-io/milvus/internal/proto/streamingpb"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls"
)
// OpenerBuilder is the interface for build wal opener.
type OpenerBuilder interface {
// Name of the wal builder, should be a lowercase string.
Name() string
Build() (Opener, error)
}
// OpenOption is the option for allocating wal instance.
type OpenOption struct {
Channel *streamingpb.PChannelInfo
InterceptorBuilders []walimpls.InterceptorBuilder // Interceptor builders to build when open.
}
// Opener is the interface for build wal instance.
type Opener interface {
// Open open a wal instance.
Open(ctx context.Context, opt *OpenOption) (WAL, error)
// Close closes the opener resources.
Close()
}

View File

@ -0,0 +1,58 @@
package helper
import "context"
// NewScannerHelper creates a new ScannerHelper.
func NewScannerHelper(scannerName string) *ScannerHelper {
ctx, cancel := context.WithCancel(context.Background())
return &ScannerHelper{
scannerName: scannerName,
ctx: ctx,
cancel: cancel,
finishCh: make(chan struct{}),
err: nil,
}
}
// ScannerHelper is a helper for scanner implementation.
type ScannerHelper struct {
scannerName string
ctx context.Context
cancel context.CancelFunc
finishCh chan struct{}
err error
}
// Context returns the context of the scanner, which will cancel when the scanner helper is closed.
func (s *ScannerHelper) Context() context.Context {
return s.ctx
}
// Name returns the name of the scanner.
func (s *ScannerHelper) Name() string {
return s.scannerName
}
// Error returns the error of the scanner.
func (s *ScannerHelper) Error() error {
<-s.finishCh
return s.err
}
// Done returns a channel that will be closed when the scanner is finished.
func (s *ScannerHelper) Done() <-chan struct{} {
return s.finishCh
}
// Close closes the scanner, block until the Finish is called.
func (s *ScannerHelper) Close() error {
s.cancel()
<-s.finishCh
return s.err
}
// Finish finishes the scanner with an error.
func (s *ScannerHelper) Finish(err error) {
s.err = err
close(s.finishCh)
}

View File

@ -0,0 +1,58 @@
package helper
import (
"testing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
)
func TestScanner(t *testing.T) {
h := NewScannerHelper("test")
assert.NotNil(t, h.Context())
assert.Equal(t, h.Name(), "test")
assert.NotNil(t, h.Context())
select {
case <-h.Done():
t.Errorf("should not done")
return
case <-h.Context().Done():
t.Error("should not cancel")
return
default:
}
finishErr := errors.New("test")
ch := make(chan struct{})
go func() {
defer close(ch)
done := false
cancel := false
cancelCh := h.Context().Done()
doneCh := h.Done()
for i := 0; ; i += 1 {
select {
case <-doneCh:
done = true
doneCh = nil
case <-cancelCh:
cancel = true
cancelCh = nil
h.Finish(finishErr)
}
if cancel && done {
return
}
if i == 0 {
assert.True(t, cancel && !done)
} else if i == 1 {
assert.True(t, cancel && done)
}
}
}()
h.Close()
assert.ErrorIs(t, h.Error(), finishErr)
<-ch
}

View File

@ -0,0 +1,33 @@
package helper
import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/streamingpb"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls"
"github.com/milvus-io/milvus/pkg/log"
)
// NewWALHelper creates a new WALHelper.
func NewWALHelper(opt *walimpls.OpenOption) *WALHelper {
return &WALHelper{
logger: log.With(zap.Any("channel", opt.Channel)),
channel: opt.Channel,
}
}
// WALHelper is a helper for WAL implementation.
type WALHelper struct {
logger *log.MLogger
channel *streamingpb.PChannelInfo
}
// Channel returns the channel of the WAL.
func (w *WALHelper) Channel() *streamingpb.PChannelInfo {
return w.channel
}
// Log returns the logger of the WAL.
func (w *WALHelper) Log() *log.MLogger {
return w.logger
}

View File

@ -0,0 +1,24 @@
package helper
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/proto/streamingpb"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls"
)
func TestWALHelper(t *testing.T) {
h := NewWALHelper(&walimpls.OpenOption{
Channel: &streamingpb.PChannelInfo{
Name: "test",
Term: 1,
ServerID: 1,
VChannelInfos: []*streamingpb.VChannelInfo{},
},
})
assert.NotNil(t, h.Channel())
assert.Equal(t, h.Channel().Name, "test")
assert.NotNil(t, h.Log())
}

View File

@ -0,0 +1,33 @@
package registry
import (
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/adaptor"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// builders is a map of registered wal builders.
var builders typeutil.ConcurrentMap[string, wal.OpenerBuilder]
// Register registers the wal builder.
//
// NOTE: this function must only be called during initialization time (i.e. in
// an init() function), name of builder is lowercase. If multiple Builder are
// registered with the same name, panic will occur.
func RegisterBuilder(b walimpls.OpenerBuilderImpls) {
bb := adaptor.AdaptImplsToBuilder(b)
_, loaded := builders.GetOrInsert(bb.Name(), bb)
if loaded {
panic("wal builder already registered: " + b.Name())
}
}
// MustGetBuilder returns the wal builder by name.
func MustGetBuilder(name string) wal.OpenerBuilder {
b, ok := builders.Get(name)
if !ok {
panic("wal builder not found: " + name)
}
return b
}

View File

@ -0,0 +1,48 @@
package registry
import (
"fmt"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/mocks/streamingnode/server/wal/mock_walimpls"
)
func TestRegister(t *testing.T) {
name := "mock"
b := mock_walimpls.NewMockOpenerBuilderImpls(t)
b.EXPECT().Name().Return(name)
RegisterBuilder(b)
b2 := MustGetBuilder(name)
assert.Equal(t, b.Name(), b2.Name())
// Panic if register twice.
assert.Panics(t, func() {
RegisterBuilder(b)
})
// Panic if get not exist builder.
assert.Panics(t, func() {
MustGetBuilder("not exist")
})
// Test concurrent.
wg := sync.WaitGroup{}
count := 10
wg.Add(count)
for i := 0; i < count; i++ {
go func(i int) {
defer wg.Done()
name := fmt.Sprintf("mock_%d", i)
b := mock_walimpls.NewMockOpenerBuilderImpls(t)
b.EXPECT().Name().Return(name)
RegisterBuilder(b)
b2 := MustGetBuilder(name)
assert.Equal(t, b.Name(), b2.Name())
}(i)
}
wg.Wait()
}

View File

@ -0,0 +1,29 @@
package wal
import (
"github.com/milvus-io/milvus/internal/util/streamingutil/message"
"github.com/milvus-io/milvus/internal/util/streamingutil/options"
)
// ReadOption is the option for reading records from the wal.
type ReadOption struct {
DeliverPolicy options.DeliverPolicy
DeliverOrder options.DeliverOrder
}
// Scanner is the interface for reading records from the wal.
type Scanner interface {
// Chan returns the channel of message.
Chan() <-chan message.ImmutableMessage
// Error returns the error of scanner failed.
// Will block until scanner is closed or Chan is dry out.
Error() error
// Done returns a channel which will be closed when scanner is finished or closed.
Done() <-chan struct{}
// Close the scanner, release the underlying resources.
// Return the error same with `Error`
Close() error
}

View File

@ -0,0 +1,28 @@
package wal
import (
"context"
"github.com/milvus-io/milvus/internal/proto/streamingpb"
"github.com/milvus-io/milvus/internal/util/streamingutil/message"
)
// WAL is the WAL framework interface.
// !!! Don't implement it directly, implement walimpls.WAL instead.
type WAL interface {
// Channel returns the channel assignment info of the wal.
// Should be read-only.
Channel() *streamingpb.PChannelInfo
// Append writes a record to the log.
Append(ctx context.Context, msg message.MutableMessage) (message.MessageID, error)
// Append a record to the log asynchronously.
AppendAsync(ctx context.Context, msg message.MutableMessage, cb func(message.MessageID, error))
// Read returns a scanner for reading records from the wal.
Read(ctx context.Context, deliverPolicy ReadOption) (Scanner, error)
// Close closes the wal instance.
Close()
}

View File

@ -0,0 +1,10 @@
package walimpls
// OpenerBuilderImpls is the interface for building wal opener impls.
type OpenerBuilderImpls interface {
// Name of the wal builder, should be a lowercase string.
Name() string
// Build build a opener impls instance.
Build() (OpenerImpls, error)
}

View File

@ -0,0 +1,64 @@
package walimpls
import (
"context"
"github.com/milvus-io/milvus/internal/util/streamingutil/message"
)
type (
// Append is the common function to append a msg to the wal.
Append = func(ctx context.Context, msg message.MutableMessage) (message.MessageID, error)
// Read is the common function to read a msg from the wal.
Read = func(ctx context.Context, opt ReadOption) (ScannerImpls, error)
)
// InterceptorBuilder is the interface to build a interceptor.
// 1. InterceptorBuilder is concurrent safe.
// 2. InterceptorBuilder can used to build a interceptor with cross-wal shared resources.
type InterceptorBuilder interface {
// Build build a interceptor with wal that interceptor will work on.
// the wal object will be sent to the interceptor builder when the wal is constructed with all interceptors.
Build(wal <-chan WALImpls) BasicInterceptor
}
type BasicInterceptor interface {
// Close the interceptor release the resources.
Close()
}
type Interceptor interface {
AppendInterceptor
BasicInterceptor
}
// AppendInterceptor is the interceptor for Append functions.
// All wal extra operations should be done by these function, such as
// 1. time tick setup.
// 2. unique primary key filter and build.
// 3. index builder.
// 4. cache sync up.
// AppendInterceptor should be lazy initialized and fast execution.
type AppendInterceptor interface {
// Execute the append operation with interceptor.
DoAppend(ctx context.Context, msg message.MutableMessage, append Append) (message.MessageID, error)
}
type InterceptorReady interface {
// Ready check if interceptor is ready.
// Close of Interceptor would not notify the ready (closed interceptor is not ready).
// So always apply timeout when waiting for ready.
// Some append interceptor may be stateful, such as index builder and unique primary key filter,
// so it need to implement the recovery logic from crash by itself before notifying ready.
// Append operation will block until ready or canceled.
// Consumer do not blocked by it.
Ready() <-chan struct{}
}
// Some interceptor may need to wait for some resource to be ready or recovery process.
type InterceptorWithReady interface {
Interceptor
InterceptorReady
}

View File

@ -0,0 +1,21 @@
package walimpls
import (
"context"
"github.com/milvus-io/milvus/internal/proto/streamingpb"
)
// OpenOption is the option for allocating wal impls instance.
type OpenOption struct {
Channel *streamingpb.PChannelInfo // Channel to open.
}
// OpenerImpls is the interface for build WALImpls instance.
type OpenerImpls interface {
// Open open a WALImpls instance.
Open(ctx context.Context, opt *OpenOption) (WALImpls, error)
// Close release the resources.
Close()
}

View File

@ -0,0 +1,31 @@
package walimpls
import (
"github.com/milvus-io/milvus/internal/util/streamingutil/message"
"github.com/milvus-io/milvus/internal/util/streamingutil/options"
)
type ReadOption struct {
Name string
DeliverPolicy options.DeliverPolicy
}
// ScannerImpls is the interface for reading records from the wal.
type ScannerImpls interface {
// Name returns the name of scanner.
Name() string
// Chan returns the channel of message.
Chan() <-chan message.ImmutableMessage
// Error returns the error of scanner failed.
// Will block until scanner is closed or Chan is dry out.
Error() error
// Done returns a channel which will be closed when scanner is finished or closed.
Done() <-chan struct{}
// Close the scanner, release the underlying resources.
// Return the error same with `Error`
Close() error
}

View File

@ -0,0 +1,259 @@
//go:build test
// +build test
package walimpls
import (
"context"
"fmt"
"math/rand"
"sort"
"strings"
"sync"
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/remeh/sizedwaitgroup"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/streamingpb"
"github.com/milvus-io/milvus/internal/util/streamingutil/message"
"github.com/milvus-io/milvus/internal/util/streamingutil/options"
)
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
func randString(l int) string {
builder := strings.Builder{}
for i := 0; i < l; i++ {
builder.WriteRune(letters[rand.Intn(len(letters))])
}
return builder.String()
}
type walImplsTestFramework struct {
b OpenerBuilderImpls
t *testing.T
messageCount int
}
func NewWALImplsTestFramework(t *testing.T, messageCount int, b OpenerBuilderImpls) *walImplsTestFramework {
return &walImplsTestFramework{
b: b,
t: t,
messageCount: messageCount,
}
}
// Run runs the test framework.
// if test failed, a error will be returned.
func (f walImplsTestFramework) Run() {
// create opener.
o, err := f.b.Build()
assert.NoError(f.t, err)
assert.NotNil(f.t, o)
defer o.Close()
// construct pChannel
name := "test_" + randString(4)
pChannel := &streamingpb.PChannelInfo{
Name: name,
Term: 1,
ServerID: 1,
VChannelInfos: []*streamingpb.VChannelInfo{},
}
ctx := context.Background()
// create a wal.
w, err := o.Open(ctx, &OpenOption{
Channel: pChannel,
})
assert.NoError(f.t, err)
assert.NotNil(f.t, w)
defer w.Close()
f.testReadAndWrite(ctx, w)
}
func (f walImplsTestFramework) testReadAndWrite(ctx context.Context, w WALImpls) {
// Test read and write.
wg := sync.WaitGroup{}
wg.Add(3)
var written []message.ImmutableMessage
var read1, read2 []message.ImmutableMessage
go func() {
defer wg.Done()
var err error
written, err = f.testAppend(ctx, w)
assert.NoError(f.t, err)
}()
go func() {
defer wg.Done()
var err error
read1, err = f.testRead(ctx, w, "scanner1")
assert.NoError(f.t, err)
}()
go func() {
defer wg.Done()
var err error
read2, err = f.testRead(ctx, w, "scanner2")
assert.NoError(f.t, err)
}()
wg.Wait()
f.assertSortedMessageList(read1)
f.assertSortedMessageList(read2)
sort.Sort(sortByMessageID(written))
f.assertEqualMessageList(written, read1)
f.assertEqualMessageList(written, read2)
// Test different scan policy, StartFrom.
readFromIdx := len(read1) / 2
readFromMsgID := read1[readFromIdx].MessageID()
s, err := w.Read(ctx, ReadOption{
Name: "scanner_deliver_start_from",
DeliverPolicy: options.DeliverPolicyStartFrom(readFromMsgID),
})
assert.NoError(f.t, err)
for i := readFromIdx; i < len(read1); i++ {
msg, ok := <-s.Chan()
assert.NotNil(f.t, msg)
assert.True(f.t, ok)
assert.True(f.t, msg.MessageID().EQ(read1[i].MessageID()))
}
s.Close()
// Test different scan policy, StartAfter.
s, err = w.Read(ctx, ReadOption{
Name: "scanner_deliver_start_after",
DeliverPolicy: options.DeliverPolicyStartAfter(readFromMsgID),
})
assert.NoError(f.t, err)
for i := readFromIdx + 1; i < len(read1); i++ {
msg, ok := <-s.Chan()
assert.NotNil(f.t, msg)
assert.True(f.t, ok)
assert.True(f.t, msg.MessageID().EQ(read1[i].MessageID()))
}
s.Close()
// Test different scan policy, Latest.
s, err = w.Read(ctx, ReadOption{
Name: "scanner_deliver_latest",
DeliverPolicy: options.DeliverPolicyLatest(),
})
assert.NoError(f.t, err)
timeoutCh := time.After(1 * time.Second)
select {
case <-s.Chan():
f.t.Errorf("should be blocked")
case <-timeoutCh:
}
s.Close()
}
func (f walImplsTestFramework) assertSortedMessageList(msgs []message.ImmutableMessage) {
for i := 1; i < len(msgs); i++ {
assert.True(f.t, msgs[i-1].MessageID().LT(msgs[i].MessageID()))
}
}
func (f walImplsTestFramework) assertEqualMessageList(msgs1 []message.ImmutableMessage, msgs2 []message.ImmutableMessage) {
assert.Equal(f.t, f.messageCount, len(msgs1))
assert.Equal(f.t, f.messageCount, len(msgs2))
for i := 0; i < len(msgs1); i++ {
assert.True(f.t, msgs1[i].MessageID().EQ(msgs2[i].MessageID()))
// assert.True(f.t, bytes.Equal(msgs1[i].Payload(), msgs2[i].Payload()))
id1, ok1 := msgs1[i].Properties().Get("id")
id2, ok2 := msgs2[i].Properties().Get("id")
assert.True(f.t, ok1)
assert.True(f.t, ok2)
assert.Equal(f.t, id1, id2)
id1, ok1 = msgs1[i].Properties().Get("const")
id2, ok2 = msgs2[i].Properties().Get("const")
assert.True(f.t, ok1)
assert.True(f.t, ok2)
assert.Equal(f.t, id1, id2)
}
}
func (f walImplsTestFramework) testAppend(ctx context.Context, w WALImpls) ([]message.ImmutableMessage, error) {
ids := make([]message.ImmutableMessage, f.messageCount)
swg := sizedwaitgroup.New(5)
for i := 0; i < f.messageCount; i++ {
swg.Add()
go func(i int) {
defer swg.Done()
// ...rocksmq has a dirty implement of properties,
// without commonpb.MsgHeader, it can not work.
header := commonpb.MsgHeader{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
MsgID: int64(i),
},
}
payload, err := proto.Marshal(&header)
if err != nil {
panic(err)
}
properties := map[string]string{
"id": fmt.Sprintf("%d", i),
"const": "t",
}
typ := message.MessageTypeUnknown
msg := message.NewBuilder().
WithPayload(payload).
WithProperties(properties).
WithMessageType(typ).
BuildMutable()
id, err := w.Append(ctx, msg)
assert.NoError(f.t, err)
assert.NotNil(f.t, id)
ids[i] = message.NewBuilder().
WithPayload(payload).
WithProperties(properties).
WithMessageID(id).
WithMessageType(typ).
BuildImmutable()
}(i)
}
swg.Wait()
return ids, nil
}
func (f walImplsTestFramework) testRead(ctx context.Context, w WALImpls, name string) ([]message.ImmutableMessage, error) {
s, err := w.Read(ctx, ReadOption{
Name: name,
DeliverPolicy: options.DeliverPolicyAll(),
})
assert.NoError(f.t, err)
assert.Equal(f.t, name, s.Name())
defer s.Close()
msgs := make([]message.ImmutableMessage, 0, f.messageCount)
for i := 0; i < f.messageCount; i++ {
msg, ok := <-s.Chan()
assert.NotNil(f.t, msg)
assert.True(f.t, ok)
msgs = append(msgs, msg)
}
return msgs, nil
}
type sortByMessageID []message.ImmutableMessage
func (a sortByMessageID) Len() int {
return len(a)
}
func (a sortByMessageID) Swap(i, j int) {
a[i], a[j] = a[j], a[i]
}
func (a sortByMessageID) Less(i, j int) bool {
return a[i].MessageID().LT(a[j].MessageID())
}

View File

@ -0,0 +1,23 @@
package walimpls
import (
"context"
"github.com/milvus-io/milvus/internal/proto/streamingpb"
"github.com/milvus-io/milvus/internal/util/streamingutil/message"
)
type WALImpls interface {
// Channel returns the channel assignment info of the wal.
// Should be read-only.
Channel() *streamingpb.PChannelInfo
// Append writes a record to the log.
Append(ctx context.Context, msg message.MutableMessage) (message.MessageID, error)
// Read returns a scanner for reading records from the wal.
Read(ctx context.Context, opts ReadOption) (ScannerImpls, error)
// Close closes the wal instance.
Close()
}

View File

@ -0,0 +1,32 @@
//go:build test
// +build test
package walimplstest
import (
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/registry"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls"
"github.com/milvus-io/milvus/internal/util/streamingutil/message"
)
const (
walName = "test"
)
func init() {
// register the builder to the registry.
registry.RegisterBuilder(&openerBuilder{})
message.RegisterMessageIDUnmsarshaler(walName, UnmarshalTestMessageID)
}
var _ walimpls.OpenerBuilderImpls = &openerBuilder{}
type openerBuilder struct{}
func (o *openerBuilder) Name() string {
return walName
}
func (o *openerBuilder) Build() (walimpls.OpenerImpls, error) {
return &opener{}, nil
}

View File

@ -0,0 +1,63 @@
//go:build test
// +build test
package walimplstest
import (
"strconv"
"github.com/milvus-io/milvus/internal/util/streamingutil/message"
)
var _ message.MessageID = testMessageID(0)
// NewTestMessageID create a new test message id.
func NewTestMessageID(id int64) message.MessageID {
return testMessageID(id)
}
// UnmarshalTestMessageID unmarshal the message id.
func UnmarshalTestMessageID(data []byte) (message.MessageID, error) {
id, err := unmarshalTestMessageID(data)
if err != nil {
return nil, err
}
return id, nil
}
// unmashalTestMessageID unmarshal the message id.
func unmarshalTestMessageID(data []byte) (testMessageID, error) {
id, err := strconv.ParseInt(string(data), 10, 64)
if err != nil {
return 0, err
}
return testMessageID(id), nil
}
// testMessageID is the message id for rmq.
type testMessageID int64
// WALName returns the name of message id related wal.
func (id testMessageID) WALName() string {
return walName
}
// LT less than.
func (id testMessageID) LT(other message.MessageID) bool {
return id < other.(testMessageID)
}
// LTE less than or equal to.
func (id testMessageID) LTE(other message.MessageID) bool {
return id <= other.(testMessageID)
}
// EQ Equal to.
func (id testMessageID) EQ(other message.MessageID) bool {
return id == other.(testMessageID)
}
// Marshal marshal the message id.
func (id testMessageID) Marshal() []byte {
return []byte(strconv.FormatInt(int64(id), 10))
}

View File

@ -0,0 +1,64 @@
//go:build test
// +build test
package walimplstest
import (
"context"
"sync"
"github.com/milvus-io/milvus/internal/util/streamingutil/message"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
var logs = typeutil.NewConcurrentMap[string, *messageLog]()
func getOrCreateLogs(name string) *messageLog {
l := newMessageLog()
l, _ = logs.GetOrInsert(name, l)
return l
}
func newMessageLog() *messageLog {
return &messageLog{
cond: syncutil.NewContextCond(&sync.Mutex{}),
id: 0,
logs: make([]message.ImmutableMessage, 0),
}
}
type messageLog struct {
cond *syncutil.ContextCond
id int64
logs []message.ImmutableMessage
}
func (l *messageLog) Append(_ context.Context, msg message.MutableMessage) (message.MessageID, error) {
l.cond.LockAndBroadcast()
defer l.cond.L.Unlock()
newMessageID := NewTestMessageID(l.id)
l.id++
l.logs = append(l.logs, msg.IntoImmutableMessage(newMessageID))
return newMessageID, nil
}
func (l *messageLog) ReadAt(ctx context.Context, idx int) (message.ImmutableMessage, error) {
var msg message.ImmutableMessage
l.cond.L.Lock()
for idx >= len(l.logs) {
if err := l.cond.Wait(ctx); err != nil {
return nil, err
}
}
msg = l.logs[idx]
l.cond.L.Unlock()
return msg, nil
}
func (l *messageLog) Len() int64 {
l.cond.L.Lock()
defer l.cond.L.Unlock()
return int64(len(l.logs))
}

View File

@ -0,0 +1,26 @@
//go:build test
// +build test
package walimplstest
import (
"context"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/helper"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls"
)
var _ walimpls.OpenerImpls = &opener{}
type opener struct{}
func (*opener) Open(ctx context.Context, opt *walimpls.OpenOption) (walimpls.WALImpls, error) {
l := getOrCreateLogs(opt.Channel.GetName())
return &walImpls{
WALHelper: *helper.NewWALHelper(opt),
datas: l,
}, nil
}
func (*opener) Close() {
}

View File

@ -0,0 +1,51 @@
//go:build test
// +build test
package walimplstest
import (
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/helper"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls"
"github.com/milvus-io/milvus/internal/util/streamingutil/message"
)
var _ walimpls.ScannerImpls = &scannerImpls{}
func newScannerImpls(opts walimpls.ReadOption, data *messageLog, offset int) *scannerImpls {
s := &scannerImpls{
ScannerHelper: helper.NewScannerHelper(opts.Name),
datas: data,
ch: make(chan message.ImmutableMessage),
offset: offset,
}
go s.executeConsume()
return s
}
type scannerImpls struct {
*helper.ScannerHelper
datas *messageLog
ch chan message.ImmutableMessage
offset int
}
func (s *scannerImpls) executeConsume() {
defer close(s.ch)
for {
msg, err := s.datas.ReadAt(s.Context(), s.offset)
if err != nil {
s.Finish(nil)
return
}
s.ch <- msg
s.offset++
}
}
func (s *scannerImpls) Chan() <-chan message.ImmutableMessage {
return s.ch
}
func (s *scannerImpls) Close() error {
return s.ScannerHelper.Close()
}

View File

@ -0,0 +1,52 @@
//go:build test
// +build test
package walimplstest
import (
"context"
"github.com/milvus-io/milvus/internal/proto/streamingpb"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/helper"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls"
"github.com/milvus-io/milvus/internal/util/streamingutil/message"
)
var _ walimpls.WALImpls = &walImpls{}
type walImpls struct {
helper.WALHelper
datas *messageLog
}
func (w *walImpls) Append(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) {
return w.datas.Append(ctx, msg)
}
func (w *walImpls) Read(ctx context.Context, opts walimpls.ReadOption) (walimpls.ScannerImpls, error) {
offset := int64(0)
switch policy := opts.DeliverPolicy.Policy.(type) {
case *streamingpb.DeliverPolicy_All:
offset = 0
case *streamingpb.DeliverPolicy_Latest:
offset = w.datas.Len()
case *streamingpb.DeliverPolicy_StartFrom:
id, err := unmarshalTestMessageID(policy.StartFrom.Id)
if err != nil {
return nil, err
}
offset = int64(id)
case *streamingpb.DeliverPolicy_StartAfter:
id, err := unmarshalTestMessageID(policy.StartAfter.Id)
if err != nil {
return nil, err
}
offset = int64(id + 1)
}
return newScannerImpls(
opts, w.datas, int(offset),
), nil
}
func (w *walImpls) Close() {
}

View File

@ -0,0 +1,11 @@
package walimplstest
import (
"testing"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls"
)
func TestWALImplsTest(t *testing.T) {
walimpls.NewWALImplsTestFramework(t, 100, &openerBuilder{}).Run()
}

View File

@ -0,0 +1,28 @@
quiet: False
with-expecter: True
filename: "mock_{{.InterfaceName}}.go"
dir: "internal/mocks/{{trimPrefix .PackagePath \"github.com/milvus-io/milvus/internal\" | dir }}/mock_{{.PackageName}}"
mockname: "Mock{{.InterfaceName}}"
outpkg: "mock_{{.PackageName}}"
packages:
github.com/milvus-io/milvus/internal/util/streamingutil/message:
interfaces:
MessageID:
ImmutableMessage:
MutableMessage:
RProperties:
github.com/milvus-io/milvus/internal/streamingnode/server/wal:
interfaces:
OpenerBuilder:
Opener:
Scanner:
WAL:
github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls:
interfaces:
OpenerBuilderImpls:
OpenerImpls:
ScannerImpls:
WALImpls:
Interceptor:
InterceptorWithReady:
InterceptorBuilder:

View File

@ -33,6 +33,9 @@ type MutableMessage interface {
// Properties returns the message properties.
Properties() Properties
// IntoImmutableMessage converts the mutable message to immutable message.
IntoImmutableMessage(msgID MessageID) ImmutableMessage
}
// ImmutableMessage is the read-only message interface.
@ -63,7 +66,7 @@ type ImmutableMessage interface {
Properties() RProperties
// Version returns the message format version.
// 0: old version before lognode.
// from 1: new version after lognode.
// 0: old version before streamingnode.
// from 1: new version after streamingnode.
Version() Version
}

View File

@ -7,8 +7,8 @@ import (
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/mocks/util/logserviceutil/mock_message"
"github.com/milvus-io/milvus/internal/util/logserviceutil/message"
"github.com/milvus-io/milvus/internal/mocks/util/streamingutil/mock_message"
"github.com/milvus-io/milvus/internal/util/streamingutil/message"
)
func TestMessage(t *testing.T) {

View File

@ -1,11 +1,17 @@
package message
import (
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// messageIDUnmarshaler is the map for message id unmarshaler.
var messageIDUnmarshaler typeutil.ConcurrentMap[string, MessageIDUnmarshaler]
var (
// messageIDUnmarshaler is the map for message id unmarshaler.
messageIDUnmarshaler typeutil.ConcurrentMap[string, MessageIDUnmarshaler]
ErrInvalidMessageID = errors.New("invalid message id")
)
// RegisterMessageIDUnmsarshaler register the message id unmarshaler.
func RegisterMessageIDUnmsarshaler(name string, unmarshaler MessageIDUnmarshaler) {

View File

@ -7,8 +7,8 @@ import (
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/mocks/util/logserviceutil/mock_message"
"github.com/milvus-io/milvus/internal/util/logserviceutil/message"
"github.com/milvus-io/milvus/internal/mocks/util/streamingutil/mock_message"
"github.com/milvus-io/milvus/internal/util/streamingutil/message"
)
func TestRegisterMessageIDUnmarshaler(t *testing.T) {

View File

@ -49,6 +49,14 @@ func (m *messageImpl) WithLastConfirmed(id MessageID) MutableMessage {
return m
}
// IntoImmutableMessage converts current message to immutable message.
func (m *messageImpl) IntoImmutableMessage(id MessageID) ImmutableMessage {
return &immutableMessageImpl{
messageImpl: *m,
id: id,
}
}
type immutableMessageImpl struct {
messageImpl
id MessageID

View File

@ -3,7 +3,7 @@ package message
import "strconv"
var (
VersionOld Version = 0 // old version before lognode.
VersionOld Version = 0 // old version before streamingnode.
VersionV1 Version = 1
)

View File

@ -0,0 +1,45 @@
package options
import (
"github.com/milvus-io/milvus/internal/proto/streamingpb"
"github.com/milvus-io/milvus/internal/util/streamingutil/message"
)
const (
deliverOrderTimetick DeliverOrder = 1
)
// DeliverOrder is the order of delivering messages.
type (
DeliverOrder int
DeliverPolicy *streamingpb.DeliverPolicy
)
// DeliverPolicyAll delivers all messages.
func DeliverPolicyAll() DeliverPolicy {
return streamingpb.NewDeliverAll()
}
// DeliverLatest delivers the latest message.
func DeliverPolicyLatest() DeliverPolicy {
return streamingpb.NewDeliverLatest()
}
// DeliverEarliest delivers the earliest message.
func DeliverPolicyStartFrom(messageID message.MessageID) DeliverPolicy {
return streamingpb.NewDeliverStartFrom(&streamingpb.MessageID{
Id: messageID.Marshal(),
})
}
// DeliverPolicyStartAfter delivers the message after the specified message.
func DeliverPolicyStartAfter(messageID message.MessageID) DeliverPolicy {
return streamingpb.NewDeliverStartAfter(&streamingpb.MessageID{
Id: messageID.Marshal(),
})
}
// DeliverOrderTimeTick delivers messages by time tick.
func DeliverOrderTimeTick() DeliverOrder {
return deliverOrderTimetick
}

View File

@ -57,7 +57,7 @@ mkdir -p indexpb
mkdir -p datapb
mkdir -p querypb
mkdir -p planpb
mkdir -p logpb
mkdir -p streamingpb
mkdir -p $ROOT_DIR/cmd/tools/migration/legacy/legacypb
@ -75,7 +75,7 @@ ${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./querypb query_coord.
${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./planpb plan.proto|| { echo 'generate plan.proto failed'; exit 1; }
${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./segcorepb segcore.proto|| { echo 'generate segcore.proto failed'; exit 1; }
${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./clusteringpb clustering.proto|| { echo 'generate clustering.proto failed'; exit 1; }
${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./logpb log.proto|| { echo 'generate logpb.proto failed'; exit 1; }
${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./streamingpb streaming.proto|| { echo 'generate streamingpb.proto failed'; exit 1; }
${protoc_opt} --proto_path=$ROOT_DIR/cmd/tools/migration/legacy/ \
--go_out=plugins=grpc,paths=source_relative:../../cmd/tools/migration/legacy/legacypb legacy.proto || { echo 'generate legacy.proto failed'; exit 1; }