enhance: new messsage interface for log service (#33286)

issue: #33285

---------

Signed-off-by: chyezh <chyezh@outlook.com>
pull/33749/head
chyezh 2024-06-11 10:38:01 +08:00 committed by GitHub
parent 8ca5ced821
commit 2b7ee1968f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 1773 additions and 1 deletions

View File

@ -511,7 +511,10 @@ generate-mockery-chunk-manager: getdeps
generate-mockery-pkg:
$(MAKE) -C pkg generate-mockery
generate-mockery: generate-mockery-types generate-mockery-kv generate-mockery-rootcoord generate-mockery-proxy generate-mockery-querycoord generate-mockery-querynode generate-mockery-datacoord generate-mockery-pkg
generate-mockery-log:
$(INSTALL_PATH)/mockery --config $(PWD)/internal/logservice/.mockery.yaml
generate-mockery: generate-mockery-types generate-mockery-kv generate-mockery-rootcoord generate-mockery-proxy generate-mockery-querycoord generate-mockery-querynode generate-mockery-datacoord generate-mockery-pkg generate-mockery-log
generate-yaml: milvus-tools
@echo "Updating milvus config yaml"

View File

@ -0,0 +1,13 @@
quiet: False
with-expecter: True
filename: "mock_{{.InterfaceName}}.go"
dir: "internal/mocks/{{trimPrefix .PackagePath \"github.com/milvus-io/milvus/internal\" | dir }}/mock_{{.PackageName}}"
mockname: "Mock{{.InterfaceName}}"
outpkg: "mock_{{.PackageName}}"
packages:
github.com/milvus-io/milvus/internal/util/logserviceutil/message:
interfaces:
MessageID:
ImmutableMessage:
MutableMessage:
RProperties:

View File

@ -0,0 +1,412 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
package mock_message
import (
message "github.com/milvus-io/milvus/internal/util/logserviceutil/message"
mock "github.com/stretchr/testify/mock"
)
// MockImmutableMessage is an autogenerated mock type for the ImmutableMessage type
type MockImmutableMessage struct {
mock.Mock
}
type MockImmutableMessage_Expecter struct {
mock *mock.Mock
}
func (_m *MockImmutableMessage) EXPECT() *MockImmutableMessage_Expecter {
return &MockImmutableMessage_Expecter{mock: &_m.Mock}
}
// EstimateSize provides a mock function with given fields:
func (_m *MockImmutableMessage) EstimateSize() int {
ret := _m.Called()
var r0 int
if rf, ok := ret.Get(0).(func() int); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int)
}
return r0
}
// MockImmutableMessage_EstimateSize_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'EstimateSize'
type MockImmutableMessage_EstimateSize_Call struct {
*mock.Call
}
// EstimateSize is a helper method to define mock.On call
func (_e *MockImmutableMessage_Expecter) EstimateSize() *MockImmutableMessage_EstimateSize_Call {
return &MockImmutableMessage_EstimateSize_Call{Call: _e.mock.On("EstimateSize")}
}
func (_c *MockImmutableMessage_EstimateSize_Call) Run(run func()) *MockImmutableMessage_EstimateSize_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockImmutableMessage_EstimateSize_Call) Return(_a0 int) *MockImmutableMessage_EstimateSize_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockImmutableMessage_EstimateSize_Call) RunAndReturn(run func() int) *MockImmutableMessage_EstimateSize_Call {
_c.Call.Return(run)
return _c
}
// LastConfirmedMessageID provides a mock function with given fields:
func (_m *MockImmutableMessage) LastConfirmedMessageID() message.MessageID {
ret := _m.Called()
var r0 message.MessageID
if rf, ok := ret.Get(0).(func() message.MessageID); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(message.MessageID)
}
}
return r0
}
// MockImmutableMessage_LastConfirmedMessageID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LastConfirmedMessageID'
type MockImmutableMessage_LastConfirmedMessageID_Call struct {
*mock.Call
}
// LastConfirmedMessageID is a helper method to define mock.On call
func (_e *MockImmutableMessage_Expecter) LastConfirmedMessageID() *MockImmutableMessage_LastConfirmedMessageID_Call {
return &MockImmutableMessage_LastConfirmedMessageID_Call{Call: _e.mock.On("LastConfirmedMessageID")}
}
func (_c *MockImmutableMessage_LastConfirmedMessageID_Call) Run(run func()) *MockImmutableMessage_LastConfirmedMessageID_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockImmutableMessage_LastConfirmedMessageID_Call) Return(_a0 message.MessageID) *MockImmutableMessage_LastConfirmedMessageID_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockImmutableMessage_LastConfirmedMessageID_Call) RunAndReturn(run func() message.MessageID) *MockImmutableMessage_LastConfirmedMessageID_Call {
_c.Call.Return(run)
return _c
}
// MessageID provides a mock function with given fields:
func (_m *MockImmutableMessage) MessageID() message.MessageID {
ret := _m.Called()
var r0 message.MessageID
if rf, ok := ret.Get(0).(func() message.MessageID); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(message.MessageID)
}
}
return r0
}
// MockImmutableMessage_MessageID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MessageID'
type MockImmutableMessage_MessageID_Call struct {
*mock.Call
}
// MessageID is a helper method to define mock.On call
func (_e *MockImmutableMessage_Expecter) MessageID() *MockImmutableMessage_MessageID_Call {
return &MockImmutableMessage_MessageID_Call{Call: _e.mock.On("MessageID")}
}
func (_c *MockImmutableMessage_MessageID_Call) Run(run func()) *MockImmutableMessage_MessageID_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockImmutableMessage_MessageID_Call) Return(_a0 message.MessageID) *MockImmutableMessage_MessageID_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockImmutableMessage_MessageID_Call) RunAndReturn(run func() message.MessageID) *MockImmutableMessage_MessageID_Call {
_c.Call.Return(run)
return _c
}
// MessageType provides a mock function with given fields:
func (_m *MockImmutableMessage) MessageType() message.MessageType {
ret := _m.Called()
var r0 message.MessageType
if rf, ok := ret.Get(0).(func() message.MessageType); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(message.MessageType)
}
return r0
}
// MockImmutableMessage_MessageType_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MessageType'
type MockImmutableMessage_MessageType_Call struct {
*mock.Call
}
// MessageType is a helper method to define mock.On call
func (_e *MockImmutableMessage_Expecter) MessageType() *MockImmutableMessage_MessageType_Call {
return &MockImmutableMessage_MessageType_Call{Call: _e.mock.On("MessageType")}
}
func (_c *MockImmutableMessage_MessageType_Call) Run(run func()) *MockImmutableMessage_MessageType_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockImmutableMessage_MessageType_Call) Return(_a0 message.MessageType) *MockImmutableMessage_MessageType_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockImmutableMessage_MessageType_Call) RunAndReturn(run func() message.MessageType) *MockImmutableMessage_MessageType_Call {
_c.Call.Return(run)
return _c
}
// Payload provides a mock function with given fields:
func (_m *MockImmutableMessage) Payload() []byte {
ret := _m.Called()
var r0 []byte
if rf, ok := ret.Get(0).(func() []byte); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]byte)
}
}
return r0
}
// MockImmutableMessage_Payload_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Payload'
type MockImmutableMessage_Payload_Call struct {
*mock.Call
}
// Payload is a helper method to define mock.On call
func (_e *MockImmutableMessage_Expecter) Payload() *MockImmutableMessage_Payload_Call {
return &MockImmutableMessage_Payload_Call{Call: _e.mock.On("Payload")}
}
func (_c *MockImmutableMessage_Payload_Call) Run(run func()) *MockImmutableMessage_Payload_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockImmutableMessage_Payload_Call) Return(_a0 []byte) *MockImmutableMessage_Payload_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockImmutableMessage_Payload_Call) RunAndReturn(run func() []byte) *MockImmutableMessage_Payload_Call {
_c.Call.Return(run)
return _c
}
// Properties provides a mock function with given fields:
func (_m *MockImmutableMessage) Properties() message.RProperties {
ret := _m.Called()
var r0 message.RProperties
if rf, ok := ret.Get(0).(func() message.RProperties); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(message.RProperties)
}
}
return r0
}
// MockImmutableMessage_Properties_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Properties'
type MockImmutableMessage_Properties_Call struct {
*mock.Call
}
// Properties is a helper method to define mock.On call
func (_e *MockImmutableMessage_Expecter) Properties() *MockImmutableMessage_Properties_Call {
return &MockImmutableMessage_Properties_Call{Call: _e.mock.On("Properties")}
}
func (_c *MockImmutableMessage_Properties_Call) Run(run func()) *MockImmutableMessage_Properties_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockImmutableMessage_Properties_Call) Return(_a0 message.RProperties) *MockImmutableMessage_Properties_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockImmutableMessage_Properties_Call) RunAndReturn(run func() message.RProperties) *MockImmutableMessage_Properties_Call {
_c.Call.Return(run)
return _c
}
// TimeTick provides a mock function with given fields:
func (_m *MockImmutableMessage) TimeTick() uint64 {
ret := _m.Called()
var r0 uint64
if rf, ok := ret.Get(0).(func() uint64); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(uint64)
}
return r0
}
// MockImmutableMessage_TimeTick_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'TimeTick'
type MockImmutableMessage_TimeTick_Call struct {
*mock.Call
}
// TimeTick is a helper method to define mock.On call
func (_e *MockImmutableMessage_Expecter) TimeTick() *MockImmutableMessage_TimeTick_Call {
return &MockImmutableMessage_TimeTick_Call{Call: _e.mock.On("TimeTick")}
}
func (_c *MockImmutableMessage_TimeTick_Call) Run(run func()) *MockImmutableMessage_TimeTick_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockImmutableMessage_TimeTick_Call) Return(_a0 uint64) *MockImmutableMessage_TimeTick_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockImmutableMessage_TimeTick_Call) RunAndReturn(run func() uint64) *MockImmutableMessage_TimeTick_Call {
_c.Call.Return(run)
return _c
}
// Version provides a mock function with given fields:
func (_m *MockImmutableMessage) Version() message.Version {
ret := _m.Called()
var r0 message.Version
if rf, ok := ret.Get(0).(func() message.Version); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(message.Version)
}
return r0
}
// MockImmutableMessage_Version_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Version'
type MockImmutableMessage_Version_Call struct {
*mock.Call
}
// Version is a helper method to define mock.On call
func (_e *MockImmutableMessage_Expecter) Version() *MockImmutableMessage_Version_Call {
return &MockImmutableMessage_Version_Call{Call: _e.mock.On("Version")}
}
func (_c *MockImmutableMessage_Version_Call) Run(run func()) *MockImmutableMessage_Version_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockImmutableMessage_Version_Call) Return(_a0 message.Version) *MockImmutableMessage_Version_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockImmutableMessage_Version_Call) RunAndReturn(run func() message.Version) *MockImmutableMessage_Version_Call {
_c.Call.Return(run)
return _c
}
// WALName provides a mock function with given fields:
func (_m *MockImmutableMessage) WALName() string {
ret := _m.Called()
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// MockImmutableMessage_WALName_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WALName'
type MockImmutableMessage_WALName_Call struct {
*mock.Call
}
// WALName is a helper method to define mock.On call
func (_e *MockImmutableMessage_Expecter) WALName() *MockImmutableMessage_WALName_Call {
return &MockImmutableMessage_WALName_Call{Call: _e.mock.On("WALName")}
}
func (_c *MockImmutableMessage_WALName_Call) Run(run func()) *MockImmutableMessage_WALName_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockImmutableMessage_WALName_Call) Return(_a0 string) *MockImmutableMessage_WALName_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockImmutableMessage_WALName_Call) RunAndReturn(run func() string) *MockImmutableMessage_WALName_Call {
_c.Call.Return(run)
return _c
}
// NewMockImmutableMessage creates a new instance of MockImmutableMessage. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockImmutableMessage(t interface {
mock.TestingT
Cleanup(func())
}) *MockImmutableMessage {
mock := &MockImmutableMessage{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -0,0 +1,245 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
package mock_message
import (
message "github.com/milvus-io/milvus/internal/util/logserviceutil/message"
mock "github.com/stretchr/testify/mock"
)
// MockMessageID is an autogenerated mock type for the MessageID type
type MockMessageID struct {
mock.Mock
}
type MockMessageID_Expecter struct {
mock *mock.Mock
}
func (_m *MockMessageID) EXPECT() *MockMessageID_Expecter {
return &MockMessageID_Expecter{mock: &_m.Mock}
}
// EQ provides a mock function with given fields: _a0
func (_m *MockMessageID) EQ(_a0 message.MessageID) bool {
ret := _m.Called(_a0)
var r0 bool
if rf, ok := ret.Get(0).(func(message.MessageID) bool); ok {
r0 = rf(_a0)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// MockMessageID_EQ_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'EQ'
type MockMessageID_EQ_Call struct {
*mock.Call
}
// EQ is a helper method to define mock.On call
// - _a0 message.MessageID
func (_e *MockMessageID_Expecter) EQ(_a0 interface{}) *MockMessageID_EQ_Call {
return &MockMessageID_EQ_Call{Call: _e.mock.On("EQ", _a0)}
}
func (_c *MockMessageID_EQ_Call) Run(run func(_a0 message.MessageID)) *MockMessageID_EQ_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(message.MessageID))
})
return _c
}
func (_c *MockMessageID_EQ_Call) Return(_a0 bool) *MockMessageID_EQ_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockMessageID_EQ_Call) RunAndReturn(run func(message.MessageID) bool) *MockMessageID_EQ_Call {
_c.Call.Return(run)
return _c
}
// LT provides a mock function with given fields: _a0
func (_m *MockMessageID) LT(_a0 message.MessageID) bool {
ret := _m.Called(_a0)
var r0 bool
if rf, ok := ret.Get(0).(func(message.MessageID) bool); ok {
r0 = rf(_a0)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// MockMessageID_LT_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LT'
type MockMessageID_LT_Call struct {
*mock.Call
}
// LT is a helper method to define mock.On call
// - _a0 message.MessageID
func (_e *MockMessageID_Expecter) LT(_a0 interface{}) *MockMessageID_LT_Call {
return &MockMessageID_LT_Call{Call: _e.mock.On("LT", _a0)}
}
func (_c *MockMessageID_LT_Call) Run(run func(_a0 message.MessageID)) *MockMessageID_LT_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(message.MessageID))
})
return _c
}
func (_c *MockMessageID_LT_Call) Return(_a0 bool) *MockMessageID_LT_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockMessageID_LT_Call) RunAndReturn(run func(message.MessageID) bool) *MockMessageID_LT_Call {
_c.Call.Return(run)
return _c
}
// LTE provides a mock function with given fields: _a0
func (_m *MockMessageID) LTE(_a0 message.MessageID) bool {
ret := _m.Called(_a0)
var r0 bool
if rf, ok := ret.Get(0).(func(message.MessageID) bool); ok {
r0 = rf(_a0)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// MockMessageID_LTE_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LTE'
type MockMessageID_LTE_Call struct {
*mock.Call
}
// LTE is a helper method to define mock.On call
// - _a0 message.MessageID
func (_e *MockMessageID_Expecter) LTE(_a0 interface{}) *MockMessageID_LTE_Call {
return &MockMessageID_LTE_Call{Call: _e.mock.On("LTE", _a0)}
}
func (_c *MockMessageID_LTE_Call) Run(run func(_a0 message.MessageID)) *MockMessageID_LTE_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(message.MessageID))
})
return _c
}
func (_c *MockMessageID_LTE_Call) Return(_a0 bool) *MockMessageID_LTE_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockMessageID_LTE_Call) RunAndReturn(run func(message.MessageID) bool) *MockMessageID_LTE_Call {
_c.Call.Return(run)
return _c
}
// Marshal provides a mock function with given fields:
func (_m *MockMessageID) Marshal() []byte {
ret := _m.Called()
var r0 []byte
if rf, ok := ret.Get(0).(func() []byte); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]byte)
}
}
return r0
}
// MockMessageID_Marshal_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Marshal'
type MockMessageID_Marshal_Call struct {
*mock.Call
}
// Marshal is a helper method to define mock.On call
func (_e *MockMessageID_Expecter) Marshal() *MockMessageID_Marshal_Call {
return &MockMessageID_Marshal_Call{Call: _e.mock.On("Marshal")}
}
func (_c *MockMessageID_Marshal_Call) Run(run func()) *MockMessageID_Marshal_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockMessageID_Marshal_Call) Return(_a0 []byte) *MockMessageID_Marshal_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockMessageID_Marshal_Call) RunAndReturn(run func() []byte) *MockMessageID_Marshal_Call {
_c.Call.Return(run)
return _c
}
// WALName provides a mock function with given fields:
func (_m *MockMessageID) WALName() string {
ret := _m.Called()
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// MockMessageID_WALName_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WALName'
type MockMessageID_WALName_Call struct {
*mock.Call
}
// WALName is a helper method to define mock.On call
func (_e *MockMessageID_Expecter) WALName() *MockMessageID_WALName_Call {
return &MockMessageID_WALName_Call{Call: _e.mock.On("WALName")}
}
func (_c *MockMessageID_WALName_Call) Run(run func()) *MockMessageID_WALName_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockMessageID_WALName_Call) Return(_a0 string) *MockMessageID_WALName_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockMessageID_WALName_Call) RunAndReturn(run func() string) *MockMessageID_WALName_Call {
_c.Call.Return(run)
return _c
}
// NewMockMessageID creates a new instance of MockMessageID. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockMessageID(t interface {
mock.TestingT
Cleanup(func())
}) *MockMessageID {
mock := &MockMessageID{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -0,0 +1,247 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
package mock_message
import (
message "github.com/milvus-io/milvus/internal/util/logserviceutil/message"
mock "github.com/stretchr/testify/mock"
)
// MockMutableMessage is an autogenerated mock type for the MutableMessage type
type MockMutableMessage struct {
mock.Mock
}
type MockMutableMessage_Expecter struct {
mock *mock.Mock
}
func (_m *MockMutableMessage) EXPECT() *MockMutableMessage_Expecter {
return &MockMutableMessage_Expecter{mock: &_m.Mock}
}
// EstimateSize provides a mock function with given fields:
func (_m *MockMutableMessage) EstimateSize() int {
ret := _m.Called()
var r0 int
if rf, ok := ret.Get(0).(func() int); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int)
}
return r0
}
// MockMutableMessage_EstimateSize_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'EstimateSize'
type MockMutableMessage_EstimateSize_Call struct {
*mock.Call
}
// EstimateSize is a helper method to define mock.On call
func (_e *MockMutableMessage_Expecter) EstimateSize() *MockMutableMessage_EstimateSize_Call {
return &MockMutableMessage_EstimateSize_Call{Call: _e.mock.On("EstimateSize")}
}
func (_c *MockMutableMessage_EstimateSize_Call) Run(run func()) *MockMutableMessage_EstimateSize_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockMutableMessage_EstimateSize_Call) Return(_a0 int) *MockMutableMessage_EstimateSize_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockMutableMessage_EstimateSize_Call) RunAndReturn(run func() int) *MockMutableMessage_EstimateSize_Call {
_c.Call.Return(run)
return _c
}
// MessageType provides a mock function with given fields:
func (_m *MockMutableMessage) MessageType() message.MessageType {
ret := _m.Called()
var r0 message.MessageType
if rf, ok := ret.Get(0).(func() message.MessageType); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(message.MessageType)
}
return r0
}
// MockMutableMessage_MessageType_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MessageType'
type MockMutableMessage_MessageType_Call struct {
*mock.Call
}
// MessageType is a helper method to define mock.On call
func (_e *MockMutableMessage_Expecter) MessageType() *MockMutableMessage_MessageType_Call {
return &MockMutableMessage_MessageType_Call{Call: _e.mock.On("MessageType")}
}
func (_c *MockMutableMessage_MessageType_Call) Run(run func()) *MockMutableMessage_MessageType_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockMutableMessage_MessageType_Call) Return(_a0 message.MessageType) *MockMutableMessage_MessageType_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockMutableMessage_MessageType_Call) RunAndReturn(run func() message.MessageType) *MockMutableMessage_MessageType_Call {
_c.Call.Return(run)
return _c
}
// Payload provides a mock function with given fields:
func (_m *MockMutableMessage) Payload() []byte {
ret := _m.Called()
var r0 []byte
if rf, ok := ret.Get(0).(func() []byte); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]byte)
}
}
return r0
}
// MockMutableMessage_Payload_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Payload'
type MockMutableMessage_Payload_Call struct {
*mock.Call
}
// Payload is a helper method to define mock.On call
func (_e *MockMutableMessage_Expecter) Payload() *MockMutableMessage_Payload_Call {
return &MockMutableMessage_Payload_Call{Call: _e.mock.On("Payload")}
}
func (_c *MockMutableMessage_Payload_Call) Run(run func()) *MockMutableMessage_Payload_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockMutableMessage_Payload_Call) Return(_a0 []byte) *MockMutableMessage_Payload_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockMutableMessage_Payload_Call) RunAndReturn(run func() []byte) *MockMutableMessage_Payload_Call {
_c.Call.Return(run)
return _c
}
// Properties provides a mock function with given fields:
func (_m *MockMutableMessage) Properties() message.Properties {
ret := _m.Called()
var r0 message.Properties
if rf, ok := ret.Get(0).(func() message.Properties); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(message.Properties)
}
}
return r0
}
// MockMutableMessage_Properties_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Properties'
type MockMutableMessage_Properties_Call struct {
*mock.Call
}
// Properties is a helper method to define mock.On call
func (_e *MockMutableMessage_Expecter) Properties() *MockMutableMessage_Properties_Call {
return &MockMutableMessage_Properties_Call{Call: _e.mock.On("Properties")}
}
func (_c *MockMutableMessage_Properties_Call) Run(run func()) *MockMutableMessage_Properties_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockMutableMessage_Properties_Call) Return(_a0 message.Properties) *MockMutableMessage_Properties_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockMutableMessage_Properties_Call) RunAndReturn(run func() message.Properties) *MockMutableMessage_Properties_Call {
_c.Call.Return(run)
return _c
}
// WithTimeTick provides a mock function with given fields: tt
func (_m *MockMutableMessage) WithTimeTick(tt uint64) message.MutableMessage {
ret := _m.Called(tt)
var r0 message.MutableMessage
if rf, ok := ret.Get(0).(func(uint64) message.MutableMessage); ok {
r0 = rf(tt)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(message.MutableMessage)
}
}
return r0
}
// MockMutableMessage_WithTimeTick_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WithTimeTick'
type MockMutableMessage_WithTimeTick_Call struct {
*mock.Call
}
// WithTimeTick is a helper method to define mock.On call
// - tt uint64
func (_e *MockMutableMessage_Expecter) WithTimeTick(tt interface{}) *MockMutableMessage_WithTimeTick_Call {
return &MockMutableMessage_WithTimeTick_Call{Call: _e.mock.On("WithTimeTick", tt)}
}
func (_c *MockMutableMessage_WithTimeTick_Call) Run(run func(tt uint64)) *MockMutableMessage_WithTimeTick_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(uint64))
})
return _c
}
func (_c *MockMutableMessage_WithTimeTick_Call) Return(_a0 message.MutableMessage) *MockMutableMessage_WithTimeTick_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockMutableMessage_WithTimeTick_Call) RunAndReturn(run func(uint64) message.MutableMessage) *MockMutableMessage_WithTimeTick_Call {
_c.Call.Return(run)
return _c
}
// NewMockMutableMessage creates a new instance of MockMutableMessage. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockMutableMessage(t interface {
mock.TestingT
Cleanup(func())
}) *MockMutableMessage {
mock := &MockMutableMessage{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -0,0 +1,169 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
package mock_message
import mock "github.com/stretchr/testify/mock"
// MockRProperties is an autogenerated mock type for the RProperties type
type MockRProperties struct {
mock.Mock
}
type MockRProperties_Expecter struct {
mock *mock.Mock
}
func (_m *MockRProperties) EXPECT() *MockRProperties_Expecter {
return &MockRProperties_Expecter{mock: &_m.Mock}
}
// Exist provides a mock function with given fields: key
func (_m *MockRProperties) Exist(key string) bool {
ret := _m.Called(key)
var r0 bool
if rf, ok := ret.Get(0).(func(string) bool); ok {
r0 = rf(key)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// MockRProperties_Exist_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Exist'
type MockRProperties_Exist_Call struct {
*mock.Call
}
// Exist is a helper method to define mock.On call
// - key string
func (_e *MockRProperties_Expecter) Exist(key interface{}) *MockRProperties_Exist_Call {
return &MockRProperties_Exist_Call{Call: _e.mock.On("Exist", key)}
}
func (_c *MockRProperties_Exist_Call) Run(run func(key string)) *MockRProperties_Exist_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string))
})
return _c
}
func (_c *MockRProperties_Exist_Call) Return(_a0 bool) *MockRProperties_Exist_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockRProperties_Exist_Call) RunAndReturn(run func(string) bool) *MockRProperties_Exist_Call {
_c.Call.Return(run)
return _c
}
// Get provides a mock function with given fields: key
func (_m *MockRProperties) Get(key string) (string, bool) {
ret := _m.Called(key)
var r0 string
var r1 bool
if rf, ok := ret.Get(0).(func(string) (string, bool)); ok {
return rf(key)
}
if rf, ok := ret.Get(0).(func(string) string); ok {
r0 = rf(key)
} else {
r0 = ret.Get(0).(string)
}
if rf, ok := ret.Get(1).(func(string) bool); ok {
r1 = rf(key)
} else {
r1 = ret.Get(1).(bool)
}
return r0, r1
}
// MockRProperties_Get_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Get'
type MockRProperties_Get_Call struct {
*mock.Call
}
// Get is a helper method to define mock.On call
// - key string
func (_e *MockRProperties_Expecter) Get(key interface{}) *MockRProperties_Get_Call {
return &MockRProperties_Get_Call{Call: _e.mock.On("Get", key)}
}
func (_c *MockRProperties_Get_Call) Run(run func(key string)) *MockRProperties_Get_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string))
})
return _c
}
func (_c *MockRProperties_Get_Call) Return(value string, ok bool) *MockRProperties_Get_Call {
_c.Call.Return(value, ok)
return _c
}
func (_c *MockRProperties_Get_Call) RunAndReturn(run func(string) (string, bool)) *MockRProperties_Get_Call {
_c.Call.Return(run)
return _c
}
// ToRawMap provides a mock function with given fields:
func (_m *MockRProperties) ToRawMap() map[string]string {
ret := _m.Called()
var r0 map[string]string
if rf, ok := ret.Get(0).(func() map[string]string); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[string]string)
}
}
return r0
}
// MockRProperties_ToRawMap_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ToRawMap'
type MockRProperties_ToRawMap_Call struct {
*mock.Call
}
// ToRawMap is a helper method to define mock.On call
func (_e *MockRProperties_Expecter) ToRawMap() *MockRProperties_ToRawMap_Call {
return &MockRProperties_ToRawMap_Call{Call: _e.mock.On("ToRawMap")}
}
func (_c *MockRProperties_ToRawMap_Call) Run(run func()) *MockRProperties_ToRawMap_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockRProperties_ToRawMap_Call) Return(_a0 map[string]string) *MockRProperties_ToRawMap_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockRProperties_ToRawMap_Call) RunAndReturn(run func() map[string]string) *MockRProperties_ToRawMap_Call {
_c.Call.Return(run)
return _c
}
// NewMockRProperties creates a new instance of MockRProperties. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockRProperties(t interface {
mock.TestingT
Cleanup(func())
}) *MockRProperties {
mock := &MockRProperties{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

18
internal/proto/log.proto Normal file
View File

@ -0,0 +1,18 @@
syntax = "proto3";
package milvus.proto.log;
option go_package = "github.com/milvus-io/milvus/internal/proto/logpb";
import "milvus.proto";
import "google/protobuf/empty.proto";
//
// Common
//
// Message is the basic unit of communication between publisher and consumer.
message Message {
bytes payload = 1; // message body
map<string, string> properties = 2; // message properties
}

View File

@ -0,0 +1,80 @@
package message
// NewBuilder creates a new builder.
func NewBuilder() *Builder {
return &Builder{
id: nil,
payload: nil,
properties: make(propertiesImpl),
}
}
// Builder is the builder for message.
type Builder struct {
id MessageID
payload []byte
properties propertiesImpl
}
// WithMessageID creates a new builder with message id.
func (b *Builder) WithMessageID(id MessageID) *Builder {
b.id = id
return b
}
// WithMessageType creates a new builder with message type.
func (b *Builder) WithMessageType(t MessageType) *Builder {
b.properties.Set(messageTypeKey, t.marshal())
return b
}
// WithProperty creates a new builder with message property.
// A key started with '_' is reserved for log system, should never used at user of client.
func (b *Builder) WithProperty(key string, val string) *Builder {
b.properties.Set(key, val)
return b
}
// WithProperties creates a new builder with message properties.
// A key started with '_' is reserved for log system, should never used at user of client.
func (b *Builder) WithProperties(kvs map[string]string) *Builder {
for key, val := range kvs {
b.properties.Set(key, val)
}
return b
}
// WithPayload creates a new builder with message payload.
func (b *Builder) WithPayload(payload []byte) *Builder {
b.payload = payload
return b
}
// BuildMutable builds a mutable message.
// Panic if set the message id.
func (b *Builder) BuildMutable() MutableMessage {
if b.id != nil {
panic("build a mutable message, message id should be nil")
}
// Set message version.
b.properties.Set(messageVersion, VersionV1.String())
return &messageImpl{
payload: b.payload,
properties: b.properties,
}
}
// BuildImmutable builds a immutable message.
// Panic if not set the message id.
func (b *Builder) BuildImmutable() ImmutableMessage {
if b.id == nil {
panic("build a immutable message, message id should not be nil")
}
return &immutableMessageImpl{
id: b.id,
messageImpl: messageImpl{
payload: b.payload,
properties: b.properties,
},
}
}

View File

@ -0,0 +1,69 @@
package message
var (
_ BasicMessage = (*messageImpl)(nil)
_ MutableMessage = (*messageImpl)(nil)
_ ImmutableMessage = (*immutableMessageImpl)(nil)
)
// BasicMessage is the basic interface of message.
type BasicMessage interface {
// MessageType returns the type of message.
MessageType() MessageType
// Message payload.
Payload() []byte
// EstimateSize returns the estimated size of message.
EstimateSize() int
}
// MutableMessage is the mutable message interface.
// Message can be modified before it is persistent by wal.
type MutableMessage interface {
BasicMessage
// WithLastConfirmed sets the last confirmed message id of current message.
// !!! preserved for log system internal usage, don't call it outside of log system.
WithLastConfirmed(id MessageID) MutableMessage
// WithTimeTick sets the time tick of current message.
// !!! preserved for log system internal usage, don't call it outside of log system.
WithTimeTick(tt uint64) MutableMessage
// Properties returns the message properties.
Properties() Properties
}
// ImmutableMessage is the read-only message interface.
// Once a message is persistent by wal, it will be immutable.
// And the message id will be assigned.
type ImmutableMessage interface {
BasicMessage
// WALName returns the name of message related wal.
WALName() string
// TimeTick returns the time tick of current message.
// Available only when the message's version greater than 0.
// Otherwise, it will panic.
TimeTick() uint64
// LastConfirmedMessageID returns the last confirmed message id of current message.
// last confirmed message is always a timetick message.
// Read from this message id will guarantee the time tick greater than this message is consumed.
// Available only when the message's version greater than 0.
// Otherwise, it will panic.
LastConfirmedMessageID() MessageID
// MessageID returns the message id of current message.
MessageID() MessageID
// Properties returns the message read only properties.
Properties() RProperties
// Version returns the message format version.
// 0: old version before lognode.
// from 1: new version after lognode.
Version() Version
}

View File

@ -0,0 +1,105 @@
package message_test
import (
"fmt"
"testing"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/mocks/util/logserviceutil/mock_message"
"github.com/milvus-io/milvus/internal/util/logserviceutil/message"
)
func TestMessage(t *testing.T) {
b := message.NewBuilder()
mutableMessage := b.WithMessageType(message.MessageTypeTimeTick).
WithPayload([]byte("payload")).
WithProperties(map[string]string{"key": "value"}).
BuildMutable()
assert.Equal(t, "payload", string(mutableMessage.Payload()))
assert.True(t, mutableMessage.Properties().Exist("key"))
v, ok := mutableMessage.Properties().Get("key")
assert.Equal(t, "value", v)
assert.True(t, ok)
assert.Equal(t, message.MessageTypeTimeTick, mutableMessage.MessageType())
assert.Equal(t, 21, mutableMessage.EstimateSize())
mutableMessage.WithTimeTick(123)
v, ok = mutableMessage.Properties().Get("_tt")
assert.True(t, ok)
tt, n := proto.DecodeVarint([]byte(v))
assert.Equal(t, uint64(123), tt)
assert.Equal(t, len([]byte(v)), n)
lcMsgID := mock_message.NewMockMessageID(t)
lcMsgID.EXPECT().Marshal().Return([]byte("lcMsgID"))
mutableMessage.WithLastConfirmed(lcMsgID)
v, ok = mutableMessage.Properties().Get("_lc")
assert.True(t, ok)
assert.Equal(t, v, "lcMsgID")
msgID := mock_message.NewMockMessageID(t)
msgID.EXPECT().EQ(msgID).Return(true)
msgID.EXPECT().WALName().Return("testMsgID")
message.RegisterMessageIDUnmsarshaler("testMsgID", func(data []byte) (message.MessageID, error) {
if string(data) == "lcMsgID" {
return msgID, nil
}
panic(fmt.Sprintf("unexpected data: %s", data))
})
b = message.NewBuilder()
immutableMessage := b.WithMessageID(msgID).
WithPayload([]byte("payload")).
WithProperties(map[string]string{
"key": "value",
"_t": "1",
"_tt": string(proto.EncodeVarint(456)),
"_v": "1",
"_lc": "lcMsgID",
}).
BuildImmutable()
assert.True(t, immutableMessage.MessageID().EQ(msgID))
assert.Equal(t, "payload", string(immutableMessage.Payload()))
assert.True(t, immutableMessage.Properties().Exist("key"))
v, ok = immutableMessage.Properties().Get("key")
assert.Equal(t, "value", v)
assert.True(t, ok)
assert.Equal(t, message.MessageTypeTimeTick, immutableMessage.MessageType())
assert.Equal(t, 36, immutableMessage.EstimateSize())
assert.Equal(t, message.Version(1), immutableMessage.Version())
assert.Equal(t, uint64(456), immutableMessage.TimeTick())
assert.NotNil(t, immutableMessage.LastConfirmedMessageID())
b = message.NewBuilder()
immutableMessage = b.WithMessageID(msgID).
WithPayload([]byte("payload")).
WithProperty("key", "value").
WithProperty("_t", "1").
BuildImmutable()
assert.True(t, immutableMessage.MessageID().EQ(msgID))
assert.Equal(t, "payload", string(immutableMessage.Payload()))
assert.True(t, immutableMessage.Properties().Exist("key"))
v, ok = immutableMessage.Properties().Get("key")
assert.Equal(t, "value", v)
assert.True(t, ok)
assert.Equal(t, message.MessageTypeTimeTick, immutableMessage.MessageType())
assert.Equal(t, 18, immutableMessage.EstimateSize())
assert.Equal(t, message.Version(0), immutableMessage.Version())
assert.Panics(t, func() {
immutableMessage.TimeTick()
})
assert.Panics(t, func() {
immutableMessage.LastConfirmedMessageID()
})
assert.Panics(t, func() {
message.NewBuilder().WithMessageID(msgID).BuildMutable()
})
assert.Panics(t, func() {
message.NewBuilder().BuildImmutable()
})
}

View File

@ -0,0 +1,34 @@
package message
// Handler is used to handle message read from log.
type Handler interface {
// Handle is the callback for handling message.
Handle(msg ImmutableMessage)
// Close is called after all messages are handled or handling is interrupted.
Close()
}
var _ Handler = ChanMessageHandler(nil)
// ChanMessageHandler is a handler just forward the message into a channel.
type ChanMessageHandler chan ImmutableMessage
// Handle is the callback for handling message.
func (cmh ChanMessageHandler) Handle(msg ImmutableMessage) {
cmh <- msg
}
// Close is called after all messages are handled or handling is interrupted.
func (cmh ChanMessageHandler) Close() {
close(cmh)
}
// NopCloseHandler is a handler that do nothing when close.
type NopCloseHandler struct {
Handler
}
// Close is called after all messages are handled or handling is interrupted.
func (nch NopCloseHandler) Close() {
}

View File

@ -0,0 +1,30 @@
package message
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestMessageHandler(t *testing.T) {
ch := make(chan ImmutableMessage, 100)
h := ChanMessageHandler(ch)
h.Handle(nil)
assert.Nil(t, <-ch)
h.Close()
_, ok := <-ch
assert.False(t, ok)
ch = make(chan ImmutableMessage, 100)
hNop := NopCloseHandler{
Handler: ChanMessageHandler(ch),
}
hNop.Handle(nil)
assert.Nil(t, <-ch)
hNop.Close()
select {
case <-ch:
panic("should not be closed")
default:
}
}

View File

@ -0,0 +1,46 @@
package message
import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// messageIDUnmarshaler is the map for message id unmarshaler.
var messageIDUnmarshaler typeutil.ConcurrentMap[string, MessageIDUnmarshaler]
// RegisterMessageIDUnmsarshaler register the message id unmarshaler.
func RegisterMessageIDUnmsarshaler(name string, unmarshaler MessageIDUnmarshaler) {
_, loaded := messageIDUnmarshaler.GetOrInsert(name, unmarshaler)
if loaded {
panic("MessageID Unmarshaler already registered: " + name)
}
}
// MessageIDUnmarshaler is the unmarshaler for message id.
type MessageIDUnmarshaler = func(b []byte) (MessageID, error)
// UnmsarshalMessageID unmarshal the message id.
func UnmarshalMessageID(name string, b []byte) (MessageID, error) {
unmarshaler, ok := messageIDUnmarshaler.Get(name)
if !ok {
panic("MessageID Unmarshaler not registered: " + name)
}
return unmarshaler(b)
}
// MessageID is the interface for message id.
type MessageID interface {
// WALName returns the name of message id related wal.
WALName() string
// LT less than.
LT(MessageID) bool
// LTE less than or equal to.
LTE(MessageID) bool
// EQ Equal to.
EQ(MessageID) bool
// Marshal marshal the message id.
Marshal() []byte
}

View File

@ -0,0 +1,44 @@
package message_test
import (
"bytes"
"testing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/mocks/util/logserviceutil/mock_message"
"github.com/milvus-io/milvus/internal/util/logserviceutil/message"
)
func TestRegisterMessageIDUnmarshaler(t *testing.T) {
msgID := mock_message.NewMockMessageID(t)
message.RegisterMessageIDUnmsarshaler("test", func(b []byte) (message.MessageID, error) {
if bytes.Equal(b, []byte("123")) {
return msgID, nil
}
return nil, errors.New("invalid")
})
id, err := message.UnmarshalMessageID("test", []byte("123"))
assert.NotNil(t, id)
assert.NoError(t, err)
id, err = message.UnmarshalMessageID("test", []byte("1234"))
assert.Nil(t, id)
assert.Error(t, err)
assert.Panics(t, func() {
message.UnmarshalMessageID("test1", []byte("123"))
})
assert.Panics(t, func() {
message.RegisterMessageIDUnmsarshaler("test", func(b []byte) (message.MessageID, error) {
if bytes.Equal(b, []byte("123")) {
return msgID, nil
}
return nil, errors.New("invalid")
})
})
}

View File

@ -0,0 +1,105 @@
package message
import (
"fmt"
"github.com/golang/protobuf/proto"
)
type messageImpl struct {
payload []byte
properties propertiesImpl
}
// MessageType returns the type of message.
func (m *messageImpl) MessageType() MessageType {
val, ok := m.properties.Get(messageTypeKey)
if !ok {
return MessageTypeUnknown
}
return unmarshalMessageType(val)
}
// Payload returns payload of current message.
func (m *messageImpl) Payload() []byte {
return m.payload
}
// Properties returns the message properties.
func (m *messageImpl) Properties() Properties {
return m.properties
}
// EstimateSize returns the estimated size of current message.
func (m *messageImpl) EstimateSize() int {
// TODO: more accurate size estimation.
return len(m.payload) + m.properties.EstimateSize()
}
// WithTimeTick sets the time tick of current message.
func (m *messageImpl) WithTimeTick(tt uint64) MutableMessage {
t := proto.EncodeVarint(tt)
m.properties.Set(messageTimeTick, string(t))
return m
}
// WithLastConfirmed sets the last confirmed message id of current message.
func (m *messageImpl) WithLastConfirmed(id MessageID) MutableMessage {
m.properties.Set(messageLastConfirmed, string(id.Marshal()))
return m
}
type immutableMessageImpl struct {
messageImpl
id MessageID
}
// WALName returns the name of message related wal.
func (m *immutableMessageImpl) WALName() string {
return m.id.WALName()
}
// TimeTick returns the time tick of current message.
func (m *immutableMessageImpl) TimeTick() uint64 {
value, ok := m.properties.Get(messageTimeTick)
if !ok {
panic(fmt.Sprintf("there's a bug in the message codes, timetick lost in properties of message, id: %+v", m.id))
}
v := []byte(value)
tt, n := proto.DecodeVarint(v)
if n != len(v) {
panic(fmt.Sprintf("there's a bug in the message codes, dirty timetick in properties of message, id: %+v", m.id))
}
return tt
}
func (m *immutableMessageImpl) LastConfirmedMessageID() MessageID {
value, ok := m.properties.Get(messageLastConfirmed)
if !ok {
panic(fmt.Sprintf("there's a bug in the message codes, last confirmed message lost in properties of message, id: %+v", m.id))
}
id, err := UnmarshalMessageID(m.id.WALName(), []byte(value))
if err != nil {
panic(fmt.Sprintf("there's a bug in the message codes, dirty last confirmed message in properties of message, id: %+v", m.id))
}
return id
}
// MessageID returns the message id.
func (m *immutableMessageImpl) MessageID() MessageID {
return m.id
}
// Properties returns the message read only properties.
func (m *immutableMessageImpl) Properties() RProperties {
return m.properties
}
// Version returns the message format version.
func (m *immutableMessageImpl) Version() Version {
value, ok := m.properties.Get(messageVersion)
if !ok {
return VersionOld
}
return newMessageVersionFromString(value)
}

View File

@ -0,0 +1,27 @@
package message
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestMessageType(t *testing.T) {
s := MessageTypeUnknown.marshal()
assert.Equal(t, "0", s)
typ := unmarshalMessageType("0")
assert.Equal(t, MessageTypeUnknown, typ)
typ = unmarshalMessageType("882s9")
assert.Equal(t, MessageTypeUnknown, typ)
}
func TestVersion(t *testing.T) {
v := newMessageVersionFromString("")
assert.Equal(t, VersionOld, v)
assert.Panics(t, func() {
newMessageVersionFromString("s1")
})
v = newMessageVersionFromString("1")
assert.Equal(t, VersionV1, v)
}

View File

@ -0,0 +1,34 @@
package message
import "strconv"
type MessageType int32
const (
MessageTypeUnknown MessageType = 0
MessageTypeTimeTick MessageType = 1
)
var messageTypeName = map[MessageType]string{
MessageTypeUnknown: "MESSAGE_TYPE_UNKNOWN",
MessageTypeTimeTick: "MESSAGE_TYPE_TIME_TICK",
}
// String implements fmt.Stringer interface.
func (t MessageType) String() string {
return messageTypeName[t]
}
// marshal marshal MessageType to string.
func (t MessageType) marshal() string {
return strconv.FormatInt(int64(t), 10)
}
// unmarshalMessageType unmarshal MessageType from string.
func unmarshalMessageType(s string) MessageType {
i, err := strconv.ParseInt(s, 10, 32)
if err != nil {
return MessageTypeUnknown
}
return MessageType(i)
}

View File

@ -0,0 +1,64 @@
package message
const (
// preserved properties
messageVersion = "_v" // message version for compatibility.
messageTypeKey = "_t" // message type key.
messageTimeTick = "_tt" // message time tick.
messageLastConfirmed = "_lc" // message last confirmed message id.
)
var (
_ RProperties = propertiesImpl{}
_ Properties = propertiesImpl{}
)
// RProperties is the read-only properties for message.
type RProperties interface {
// Get find a value by key.
Get(key string) (value string, ok bool)
// Exist check if a key exists.
Exist(key string) bool
// ToRawMap returns the raw map of properties.
ToRawMap() map[string]string
}
// Properties is the write and readable properties for message.
type Properties interface {
RProperties
// Set a key-value pair in Properties.
Set(key, value string)
}
// propertiesImpl is the implementation of Properties.
type propertiesImpl map[string]string
func (prop propertiesImpl) Get(key string) (value string, ok bool) {
value, ok = prop[key]
return
}
func (prop propertiesImpl) Exist(key string) bool {
_, ok := prop[key]
return ok
}
func (prop propertiesImpl) Set(key, value string) {
prop[key] = value
}
func (prop propertiesImpl) ToRawMap() map[string]string {
return map[string]string(prop)
}
// EstimateSize returns the estimated size of properties.
func (prop propertiesImpl) EstimateSize() int {
size := 0
for k, v := range prop {
size += len(k) + len(v)
}
return size
}

View File

@ -0,0 +1,25 @@
package message
import "strconv"
var (
VersionOld Version = 0 // old version before lognode.
VersionV1 Version = 1
)
type Version int // message version for compatibility.
func newMessageVersionFromString(s string) Version {
if s == "" {
return VersionOld
}
v, err := strconv.ParseInt(s, 10, 64)
if err != nil {
panic("unexpected message version")
}
return Version(v)
}
func (v Version) String() string {
return strconv.FormatInt(int64(v), 10)
}

View File

@ -57,6 +57,7 @@ mkdir -p indexpb
mkdir -p datapb
mkdir -p querypb
mkdir -p planpb
mkdir -p logpb
mkdir -p $ROOT_DIR/cmd/tools/migration/legacy/legacypb
@ -74,6 +75,7 @@ ${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./querypb query_coord.
${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./planpb plan.proto|| { echo 'generate plan.proto failed'; exit 1; }
${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./segcorepb segcore.proto|| { echo 'generate segcore.proto failed'; exit 1; }
${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./clusteringpb clustering.proto|| { echo 'generate clustering.proto failed'; exit 1; }
${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./logpb log.proto|| { echo 'generate logpb.proto failed'; exit 1; }
${protoc_opt} --proto_path=$ROOT_DIR/cmd/tools/migration/legacy/ \
--go_out=plugins=grpc,paths=source_relative:../../cmd/tools/migration/legacy/legacypb legacy.proto || { echo 'generate legacy.proto failed'; exit 1; }