milvus/internal/streamingcoord/server/balancer/channel/pchannel.go

162 lines
5.2 KiB
Go

package channel
import (
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
)
// newPChannelMeta creates a new PChannelMeta.
func newPChannelMeta(name string) *PChannelMeta {
return &PChannelMeta{
inner: &streamingpb.PChannelMeta{
Channel: &streamingpb.PChannelInfo{
Name: name,
Term: 1,
},
Node: nil,
State: streamingpb.PChannelMetaState_PCHANNEL_META_STATE_UNINITIALIZED,
Histories: make([]*streamingpb.PChannelAssignmentLog, 0),
},
}
}
// newPChannelMetaFromProto creates a new PChannelMeta from proto.
func newPChannelMetaFromProto(channel *streamingpb.PChannelMeta) *PChannelMeta {
return &PChannelMeta{
inner: channel,
}
}
// PChannelMeta is the read only version of PChannelInfo, to be used in balancer,
// If you need to update PChannelMeta, please use CopyForWrite to get mutablePChannel.
type PChannelMeta struct {
inner *streamingpb.PChannelMeta
}
// Name returns the name of the channel.
func (c *PChannelMeta) Name() string {
return c.inner.GetChannel().GetName()
}
// ChannelInfo returns the channel info.
func (c *PChannelMeta) ChannelInfo() types.PChannelInfo {
return types.NewPChannelInfoFromProto(c.inner.Channel)
}
// Term returns the current term of the channel.
func (c *PChannelMeta) CurrentTerm() int64 {
return c.inner.GetChannel().GetTerm()
}
// CurrentServerID returns the server id of the channel.
// If the channel is not assigned to any server, return -1.
func (c *PChannelMeta) CurrentServerID() int64 {
return c.inner.GetNode().GetServerId()
}
// CurrentAssignment returns the current assignment of the channel.
func (c *PChannelMeta) CurrentAssignment() types.PChannelInfoAssigned {
return types.PChannelInfoAssigned{
Channel: types.NewPChannelInfoFromProto(c.inner.Channel),
Node: types.NewStreamingNodeInfoFromProto(c.inner.Node),
}
}
// AssignHistories returns the history of the channel assignment.
func (c *PChannelMeta) AssignHistories() []types.PChannelInfoAssigned {
history := make([]types.PChannelInfoAssigned, 0, len(c.inner.Histories))
for _, h := range c.inner.Histories {
history = append(history, types.PChannelInfoAssigned{
Channel: types.PChannelInfo{
Name: c.inner.GetChannel().GetName(),
Term: h.Term,
},
Node: types.NewStreamingNodeInfoFromProto(h.Node),
})
}
return history
}
// IsAssigned returns if the channel is assigned to a server.
func (c *PChannelMeta) IsAssigned() bool {
return c.inner.State == streamingpb.PChannelMetaState_PCHANNEL_META_STATE_ASSIGNED
}
// State returns the state of the channel.
func (c *PChannelMeta) State() streamingpb.PChannelMetaState {
return c.inner.State
}
// CopyForWrite returns mutablePChannel to modify pchannel
// but didn't affect other replicas.
func (c *PChannelMeta) CopyForWrite() *mutablePChannel {
return &mutablePChannel{
PChannelMeta: &PChannelMeta{
inner: proto.Clone(c.inner).(*streamingpb.PChannelMeta),
},
}
}
// mutablePChannel is a mutable version of PChannel.
// use to update the channel info.
type mutablePChannel struct {
*PChannelMeta
}
// TryAssignToServerID assigns the channel to a server.
func (m *mutablePChannel) TryAssignToServerID(streamingNode types.StreamingNodeInfo) bool {
if m.CurrentServerID() == streamingNode.ServerID && m.inner.State == streamingpb.PChannelMetaState_PCHANNEL_META_STATE_ASSIGNED {
// if the channel is already assigned to the server, return false.
return false
}
if m.inner.State != streamingpb.PChannelMetaState_PCHANNEL_META_STATE_UNINITIALIZED {
// if the channel is already initialized, add the history.
m.inner.Histories = append(m.inner.Histories, &streamingpb.PChannelAssignmentLog{
Term: m.inner.Channel.Term,
Node: m.inner.Node,
})
}
// otherwise update the channel into assgining state.
m.inner.Channel.Term++
m.inner.Node = types.NewProtoFromStreamingNodeInfo(streamingNode)
m.inner.State = streamingpb.PChannelMetaState_PCHANNEL_META_STATE_ASSIGNING
return true
}
// AssignToServerDone assigns the channel to the server done.
func (m *mutablePChannel) AssignToServerDone() []types.PChannelInfoAssigned {
var history []types.PChannelInfoAssigned
if m.inner.State == streamingpb.PChannelMetaState_PCHANNEL_META_STATE_ASSIGNING {
history = make([]types.PChannelInfoAssigned, 0, len(m.inner.Histories))
for _, h := range m.inner.Histories {
history = append(history, types.PChannelInfoAssigned{
Channel: types.PChannelInfo{
Name: m.inner.Channel.Name,
Term: h.GetTerm(),
},
Node: types.NewStreamingNodeInfoFromProto(h.Node),
})
}
m.inner.Histories = make([]*streamingpb.PChannelAssignmentLog, 0)
m.inner.State = streamingpb.PChannelMetaState_PCHANNEL_META_STATE_ASSIGNED
}
return history
}
// MarkAsUnavailable marks the channel as unavailable.
func (m *mutablePChannel) MarkAsUnavailable(term int64) {
if m.inner.State == streamingpb.PChannelMetaState_PCHANNEL_META_STATE_ASSIGNED && m.CurrentTerm() == term {
m.inner.State = streamingpb.PChannelMetaState_PCHANNEL_META_STATE_UNAVAILABLE
}
}
// IntoRawMeta returns the raw meta, no longger available after call.
func (m *mutablePChannel) IntoRawMeta() *streamingpb.PChannelMeta {
c := m.PChannelMeta
m.PChannelMeta = nil
return c.inner
}