mirror of https://github.com/milvus-io/milvus.git
[skip e2e]Fix policy typo (#13645)
Signed-off-by: godchen0212 <qingxiang.chen@zilliz.com>pull/13667/head
parent
cb570a5a3a
commit
c983af85c5
|
@ -108,11 +108,11 @@ func filterNode(infos []*NodeChannelInfo, nodeID int64) []*NodeChannelInfo {
|
|||
return filtered
|
||||
}
|
||||
|
||||
// ConsistentHashRegisterPolicy use a consistent hash to matain the mapping
|
||||
func ConsistentHashRegisterPolicy(hashring *consistent.Consistent) RegisterPolicy {
|
||||
// ConsistentHashRegisterPolicy use a consistent hash to maintain the mapping
|
||||
func ConsistentHashRegisterPolicy(hashRing *consistent.Consistent) RegisterPolicy {
|
||||
return func(store ROChannelStore, nodeID int64) ChannelOpSet {
|
||||
elems := formatNodeIDs(store.GetNodes())
|
||||
hashring.Set(elems)
|
||||
hashRing.Set(elems)
|
||||
|
||||
removes := make(map[int64][]*channel)
|
||||
adds := make(map[int64][]*channel)
|
||||
|
@ -129,13 +129,13 @@ func ConsistentHashRegisterPolicy(hashring *consistent.Consistent) RegisterPolic
|
|||
channelsInfo := store.GetNodesChannels()
|
||||
for _, c := range channelsInfo {
|
||||
for _, ch := range c.Channels {
|
||||
idstr, err := hashring.Get(ch.Name)
|
||||
idStr, err := hashRing.Get(ch.Name)
|
||||
if err != nil {
|
||||
log.Warn("receive error when getting from hashring",
|
||||
log.Warn("receive error when getting from hashRing",
|
||||
zap.String("channel", ch.Name), zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
did, err := deformatNodeID(idstr)
|
||||
did, err := deformatNodeID(idStr)
|
||||
if err != nil {
|
||||
log.Warn("failed to deformat node id", zap.Int64("nodeID", did))
|
||||
return nil
|
||||
|
@ -202,16 +202,16 @@ func AverageAssignPolicy(store ROChannelStore, channels []*channel) ChannelOpSet
|
|||
}
|
||||
|
||||
// ConsistentHashChannelAssignPolicy use a consistent hash algorithm to determine channel assignment
|
||||
func ConsistentHashChannelAssignPolicy(hashring *consistent.Consistent) ChannelAssignPolicy {
|
||||
func ConsistentHashChannelAssignPolicy(hashRing *consistent.Consistent) ChannelAssignPolicy {
|
||||
return func(store ROChannelStore, channels []*channel) ChannelOpSet {
|
||||
hashring.Set(formatNodeIDs(store.GetNodes()))
|
||||
hashRing.Set(formatNodeIDs(store.GetNodes()))
|
||||
|
||||
filteredChannels := filterChannels(store, channels)
|
||||
if len(filteredChannels) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(hashring.Members()) == 0 {
|
||||
if len(hashRing.Members()) == 0 {
|
||||
opSet := ChannelOpSet{}
|
||||
opSet.Add(bufferID, channels)
|
||||
return opSet
|
||||
|
@ -219,13 +219,13 @@ func ConsistentHashChannelAssignPolicy(hashring *consistent.Consistent) ChannelA
|
|||
|
||||
adds := make(map[int64][]*channel)
|
||||
for _, c := range filteredChannels {
|
||||
idstr, err := hashring.Get(c.Name)
|
||||
idStr, err := hashRing.Get(c.Name)
|
||||
if err != nil {
|
||||
log.Warn("receive error when getting from hashring",
|
||||
log.Warn("receive error when getting from hashRing",
|
||||
zap.String("channel", c.Name), zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
did, err := deformatNodeID(idstr)
|
||||
did, err := deformatNodeID(idStr)
|
||||
if err != nil {
|
||||
log.Warn("failed to deformat node id", zap.Int64("nodeID", did))
|
||||
return nil
|
||||
|
@ -316,9 +316,9 @@ func AvgAssignUnregisteredChannels(store ROChannelStore, nodeID int64) ChannelOp
|
|||
}
|
||||
|
||||
// ConsistentHashDeregisterPolicy return a DeregisterPolicy that uses consistent hash
|
||||
func ConsistentHashDeregisterPolicy(hashring *consistent.Consistent) DeregisterPolicy {
|
||||
func ConsistentHashDeregisterPolicy(hashRing *consistent.Consistent) DeregisterPolicy {
|
||||
return func(store ROChannelStore, nodeID int64) ChannelOpSet {
|
||||
hashring.Set(formatNodeIDsWithFilter(store.GetNodes(), nodeID))
|
||||
hashRing.Set(formatNodeIDsWithFilter(store.GetNodes(), nodeID))
|
||||
channels := store.GetNodesChannels()
|
||||
opSet := ChannelOpSet{}
|
||||
var deletedInfo *NodeChannelInfo
|
||||
|
@ -337,7 +337,7 @@ func ConsistentHashDeregisterPolicy(hashring *consistent.Consistent) DeregisterP
|
|||
opSet.Delete(nodeID, deletedInfo.Channels)
|
||||
|
||||
// If no members in hash ring, store channels in buffer
|
||||
if len(hashring.Members()) == 0 {
|
||||
if len(hashRing.Members()) == 0 {
|
||||
opSet.Add(bufferID, deletedInfo.Channels)
|
||||
return opSet
|
||||
}
|
||||
|
@ -345,15 +345,15 @@ func ConsistentHashDeregisterPolicy(hashring *consistent.Consistent) DeregisterP
|
|||
// reassign channels of deleted node
|
||||
updates := make(map[int64][]*channel)
|
||||
for _, c := range deletedInfo.Channels {
|
||||
idstr, err := hashring.Get(c.Name)
|
||||
idStr, err := hashRing.Get(c.Name)
|
||||
if err != nil {
|
||||
log.Warn("failed to get channel in hash ring", zap.String("channel", c.Name))
|
||||
return nil
|
||||
}
|
||||
|
||||
did, err := deformatNodeID(idstr)
|
||||
did, err := deformatNodeID(idStr)
|
||||
if err != nil {
|
||||
log.Warn("failed to deformat id", zap.String("id", idstr))
|
||||
log.Warn("failed to deformat id", zap.String("id", idStr))
|
||||
}
|
||||
|
||||
updates[did] = append(updates[did], c)
|
||||
|
@ -437,7 +437,7 @@ func EmptyBgChecker(channels []*NodeChannelInfo, ts time.Time) ([]*NodeChannelIn
|
|||
// BgCheckWithMaxWatchDuration returns a ChannelBGChecker with the maxWatchDuration
|
||||
func BgCheckWithMaxWatchDuration(kv kv.TxnKV) ChannelBGChecker {
|
||||
return func(channels []*NodeChannelInfo, ts time.Time) ([]*NodeChannelInfo, error) {
|
||||
reallocations := make([]*NodeChannelInfo, 0, len(channels))
|
||||
reAllocations := make([]*NodeChannelInfo, 0, len(channels))
|
||||
for _, ch := range channels {
|
||||
cinfo := &NodeChannelInfo{
|
||||
NodeID: ch.NodeID,
|
||||
|
@ -465,10 +465,10 @@ func BgCheckWithMaxWatchDuration(kv kv.TxnKV) ChannelBGChecker {
|
|||
}
|
||||
}
|
||||
if len(cinfo.Channels) != 0 {
|
||||
reallocations = append(reallocations, cinfo)
|
||||
reAllocations = append(reAllocations, cinfo)
|
||||
}
|
||||
}
|
||||
return reallocations, nil
|
||||
return reAllocations, nil
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue