mirror of https://github.com/milvus-io/milvus.git
243 lines
8.6 KiB
Go
243 lines
8.6 KiB
Go
package message
|
|
|
|
import (
|
|
"reflect"
|
|
|
|
"github.com/cockroachdb/errors"
|
|
"google.golang.org/protobuf/proto"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
|
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
|
)
|
|
|
|
// NewMutableMessage creates a new mutable message.
|
|
// !!! Only used at server side for streamingnode internal service, don't use it at client side.
|
|
func NewMutableMessage(payload []byte, properties map[string]string) MutableMessage {
|
|
return &messageImpl{
|
|
payload: payload,
|
|
properties: properties,
|
|
}
|
|
}
|
|
|
|
// NewImmutableMessage creates a new immutable message.
|
|
// !!! Only used at server side for streaming internal service, don't use it at client side.
|
|
func NewImmutableMesasge(
|
|
id MessageID,
|
|
payload []byte,
|
|
properties map[string]string,
|
|
) ImmutableMessage {
|
|
return &immutableMessageImpl{
|
|
id: id,
|
|
messageImpl: messageImpl{
|
|
payload: payload,
|
|
properties: properties,
|
|
},
|
|
}
|
|
}
|
|
|
|
// List all type-safe mutable message builders here.
|
|
var (
|
|
NewTimeTickMessageBuilderV1 = createNewMessageBuilderV1[*TimeTickMessageHeader, *msgpb.TimeTickMsg]()
|
|
NewInsertMessageBuilderV1 = createNewMessageBuilderV1[*InsertMessageHeader, *msgpb.InsertRequest]()
|
|
NewDeleteMessageBuilderV1 = createNewMessageBuilderV1[*DeleteMessageHeader, *msgpb.DeleteRequest]()
|
|
NewCreateCollectionMessageBuilderV1 = createNewMessageBuilderV1[*CreateCollectionMessageHeader, *msgpb.CreateCollectionRequest]()
|
|
NewDropCollectionMessageBuilderV1 = createNewMessageBuilderV1[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest]()
|
|
NewCreatePartitionMessageBuilderV1 = createNewMessageBuilderV1[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest]()
|
|
NewDropPartitionMessageBuilderV1 = createNewMessageBuilderV1[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest]()
|
|
NewFlushMessageBuilderV2 = createNewMessageBuilderV2[*FlushMessageHeader, *FlushMessageBody]()
|
|
NewManualFlushMessageBuilderV2 = createNewMessageBuilderV2[*ManualFlushMessageHeader, *ManualFlushMessageBody]()
|
|
NewBeginTxnMessageBuilderV2 = createNewMessageBuilderV2[*BeginTxnMessageHeader, *BeginTxnMessageBody]()
|
|
NewCommitTxnMessageBuilderV2 = createNewMessageBuilderV2[*CommitTxnMessageHeader, *CommitTxnMessageBody]()
|
|
NewRollbackTxnMessageBuilderV2 = createNewMessageBuilderV2[*RollbackTxnMessageHeader, *RollbackTxnMessageBody]()
|
|
newTxnMessageBuilderV2 = createNewMessageBuilderV2[*TxnMessageHeader, *TxnMessageBody]()
|
|
)
|
|
|
|
// createNewMessageBuilderV1 creates a new message builder with v1 marker.
|
|
func createNewMessageBuilderV1[H proto.Message, B proto.Message]() func() *mutableMesasgeBuilder[H, B] {
|
|
return func() *mutableMesasgeBuilder[H, B] {
|
|
return newMutableMessageBuilder[H, B](VersionV1)
|
|
}
|
|
}
|
|
|
|
// List all type-safe mutable message builders here.
|
|
func createNewMessageBuilderV2[H proto.Message, B proto.Message]() func() *mutableMesasgeBuilder[H, B] {
|
|
return func() *mutableMesasgeBuilder[H, B] {
|
|
return newMutableMessageBuilder[H, B](VersionV2)
|
|
}
|
|
}
|
|
|
|
// newMutableMessageBuilder creates a new builder.
|
|
// Should only used at client side.
|
|
func newMutableMessageBuilder[H proto.Message, B proto.Message](v Version) *mutableMesasgeBuilder[H, B] {
|
|
var h H
|
|
messageType := mustGetMessageTypeFromHeader(h)
|
|
properties := make(propertiesImpl)
|
|
properties.Set(messageTypeKey, messageType.marshal())
|
|
properties.Set(messageVersion, v.String())
|
|
return &mutableMesasgeBuilder[H, B]{
|
|
properties: properties,
|
|
}
|
|
}
|
|
|
|
// mutableMesasgeBuilder is the builder for message.
|
|
type mutableMesasgeBuilder[H proto.Message, B proto.Message] struct {
|
|
header H
|
|
body B
|
|
properties propertiesImpl
|
|
broadcast bool
|
|
}
|
|
|
|
// WithMessageHeader creates a new builder with determined message type.
|
|
func (b *mutableMesasgeBuilder[H, B]) WithHeader(h H) *mutableMesasgeBuilder[H, B] {
|
|
b.header = h
|
|
return b
|
|
}
|
|
|
|
// WithBody creates a new builder with message body.
|
|
func (b *mutableMesasgeBuilder[H, B]) WithBody(body B) *mutableMesasgeBuilder[H, B] {
|
|
b.body = body
|
|
return b
|
|
}
|
|
|
|
// WithVChannel creates a new builder with virtual channel.
|
|
func (b *mutableMesasgeBuilder[H, B]) WithVChannel(vchannel string) *mutableMesasgeBuilder[H, B] {
|
|
if b.broadcast {
|
|
panic("a broadcast message cannot hold vchannel")
|
|
}
|
|
b.WithProperty(messageVChannel, vchannel)
|
|
return b
|
|
}
|
|
|
|
// WithBroadcast creates a new builder with broadcast property.
|
|
func (b *mutableMesasgeBuilder[H, B]) WithBroadcast() *mutableMesasgeBuilder[H, B] {
|
|
b.broadcast = true
|
|
return b
|
|
}
|
|
|
|
// WithProperty creates a new builder with message property.
|
|
// A key started with '_' is reserved for streaming system, should never used at user of client.
|
|
func (b *mutableMesasgeBuilder[H, B]) WithProperty(key string, val string) *mutableMesasgeBuilder[H, B] {
|
|
b.properties.Set(key, val)
|
|
return b
|
|
}
|
|
|
|
// WithProperties creates a new builder with message properties.
|
|
// A key started with '_' is reserved for streaming system, should never used at user of client.
|
|
func (b *mutableMesasgeBuilder[H, B]) WithProperties(kvs map[string]string) *mutableMesasgeBuilder[H, B] {
|
|
for key, val := range kvs {
|
|
b.properties.Set(key, val)
|
|
}
|
|
return b
|
|
}
|
|
|
|
// BuildMutable builds a mutable message.
|
|
// Panic if not set payload and message type.
|
|
// should only used at client side.
|
|
func (b *mutableMesasgeBuilder[H, B]) BuildMutable() (MutableMessage, error) {
|
|
// payload and header must be a pointer
|
|
if reflect.ValueOf(b.header).IsNil() {
|
|
panic("message builder not ready for header field")
|
|
}
|
|
if reflect.ValueOf(b.body).IsNil() {
|
|
panic("message builder not ready for body field")
|
|
}
|
|
if !b.broadcast && !b.properties.Exist(messageVChannel) {
|
|
panic("a non broadcast message builder not ready for vchannel field")
|
|
}
|
|
|
|
// setup header.
|
|
sp, err := EncodeProto(b.header)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to encode header")
|
|
}
|
|
b.properties.Set(messageHeader, sp)
|
|
|
|
payload, err := proto.Marshal(b.body)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to marshal body")
|
|
}
|
|
return &messageImpl{
|
|
payload: payload,
|
|
properties: b.properties,
|
|
}, nil
|
|
}
|
|
|
|
// NewImmutableTxnMessageBuilder creates a new txn builder.
|
|
func NewImmutableTxnMessageBuilder(begin ImmutableBeginTxnMessageV2) *ImmutableTxnMessageBuilder {
|
|
return &ImmutableTxnMessageBuilder{
|
|
txnCtx: *begin.TxnContext(),
|
|
begin: begin,
|
|
messages: make([]ImmutableMessage, 0),
|
|
}
|
|
}
|
|
|
|
// ImmutableTxnMessageBuilder is a builder for txn message.
|
|
type ImmutableTxnMessageBuilder struct {
|
|
txnCtx TxnContext
|
|
begin ImmutableBeginTxnMessageV2
|
|
messages []ImmutableMessage
|
|
}
|
|
|
|
// ExpiredTimeTick returns the expired time tick of the txn.
|
|
func (b *ImmutableTxnMessageBuilder) ExpiredTimeTick() uint64 {
|
|
if len(b.messages) > 0 {
|
|
return tsoutil.AddPhysicalDurationOnTs(b.messages[len(b.messages)-1].TimeTick(), b.txnCtx.Keepalive)
|
|
}
|
|
return tsoutil.AddPhysicalDurationOnTs(b.begin.TimeTick(), b.txnCtx.Keepalive)
|
|
}
|
|
|
|
// Push pushes a message into the txn builder.
|
|
func (b *ImmutableTxnMessageBuilder) Add(msg ImmutableMessage) *ImmutableTxnMessageBuilder {
|
|
b.messages = append(b.messages, msg)
|
|
return b
|
|
}
|
|
|
|
// EstimateSize estimates the size of the txn message.
|
|
func (b *ImmutableTxnMessageBuilder) EstimateSize() int {
|
|
size := b.begin.EstimateSize()
|
|
for _, m := range b.messages {
|
|
size += m.EstimateSize()
|
|
}
|
|
return size
|
|
}
|
|
|
|
// Build builds a txn message.
|
|
func (b *ImmutableTxnMessageBuilder) Build(commit ImmutableCommitTxnMessageV2) (ImmutableTxnMessage, error) {
|
|
msg, err := newImmutableTxnMesasgeFromWAL(b.begin, b.messages, commit)
|
|
b.begin = nil
|
|
b.messages = nil
|
|
return msg, err
|
|
}
|
|
|
|
// newImmutableTxnMesasgeFromWAL creates a new immutable transaction message.
|
|
func newImmutableTxnMesasgeFromWAL(
|
|
begin ImmutableBeginTxnMessageV2,
|
|
body []ImmutableMessage,
|
|
commit ImmutableCommitTxnMessageV2,
|
|
) (ImmutableTxnMessage, error) {
|
|
// combine begin and commit messages into one.
|
|
msg, err := newTxnMessageBuilderV2().
|
|
WithHeader(&TxnMessageHeader{}).
|
|
WithBody(&TxnMessageBody{}).
|
|
WithVChannel(begin.VChannel()).
|
|
BuildMutable()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// we don't need to modify the begin message's timetick, but set all the timetick of body messages.
|
|
for _, m := range body {
|
|
m.(*immutableMessageImpl).overwriteTimeTick(commit.TimeTick())
|
|
m.(*immutableMessageImpl).overwriteLastConfirmedMessageID(commit.LastConfirmedMessageID())
|
|
}
|
|
immutableMsg := msg.WithTimeTick(commit.TimeTick()).
|
|
WithLastConfirmed(commit.LastConfirmedMessageID()).
|
|
WithTxnContext(*commit.TxnContext()).
|
|
IntoImmutableMessage(commit.MessageID())
|
|
return &immutableTxnMessageImpl{
|
|
immutableMessageImpl: *immutableMsg.(*immutableMessageImpl),
|
|
begin: begin,
|
|
messages: body,
|
|
commit: commit,
|
|
}, nil
|
|
}
|