mirror of https://github.com/milvus-io/milvus.git
stop add new node into cluster before validated (#6241)
* stop add new new into cluster Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> * Fix startup & test cases Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/6252/head
parent
a518e408b1
commit
580e3a57cf
|
@ -95,17 +95,18 @@ func newCluster(ctx context.Context, dataManager *clusterNodeManager,
|
|||
assignPolicy: defaultAssignPolicy(),
|
||||
}
|
||||
|
||||
c.candidateManager = newCandidateManager(20, c.validateDataNode, c.enableDataNode)
|
||||
|
||||
for _, opt := range opts {
|
||||
opt.apply(c)
|
||||
}
|
||||
|
||||
c.candidateManager = newCandidateManager(20, c.validateDataNode, c.enableDataNode)
|
||||
return c
|
||||
}
|
||||
|
||||
// startup applies statup policy
|
||||
func (c *cluster) startup(dataNodes []*datapb.DataNodeInfo) error {
|
||||
deltaChange := c.dataManager.updateCluster(dataNodes)
|
||||
/*deltaChange := c.dataManager.updateCluster(dataNodes)
|
||||
nodes, chanBuffer := c.dataManager.getDataNodes(false)
|
||||
var rets []*datapb.DataNodeInfo
|
||||
var err error
|
||||
|
@ -117,7 +118,8 @@ func (c *cluster) startup(dataNodes []*datapb.DataNodeInfo) error {
|
|||
//does not trigger new another refresh, pending evt will do
|
||||
}
|
||||
c.dataManager.updateDataNodes(rets, chanBuffer)
|
||||
return nil
|
||||
return nil*/
|
||||
return c.refresh(dataNodes)
|
||||
}
|
||||
|
||||
// refresh rough refresh datanode status after event received
|
||||
|
@ -150,8 +152,10 @@ func (c *cluster) refresh(dataNodes []*datapb.DataNodeInfo) error {
|
|||
}
|
||||
}
|
||||
}
|
||||
_, buffer := c.dataManager.getDataNodes(true)
|
||||
c.updateNodeWatch(restartNodes, buffer)
|
||||
if len(restartNodes) > 0 {
|
||||
_, buffer := c.dataManager.getDataNodes(true)
|
||||
c.updateNodeWatch(restartNodes, buffer)
|
||||
}
|
||||
|
||||
// 3. offline do unregister
|
||||
unregisterNodes := make([]*datapb.DataNodeInfo, 0, len(deltaChange.offlines)) // possible nodes info to unregister
|
||||
|
|
|
@ -97,15 +97,6 @@ func (c *clusterNodeManager) updateCluster(dataNodes []*datapb.DataNodeInfo) *cl
|
|||
continue
|
||||
}
|
||||
|
||||
newNode := &dataNodeInfo{
|
||||
info: &datapb.DataNodeInfo{
|
||||
Address: n.Address,
|
||||
Version: n.Version,
|
||||
Channels: []*datapb.ChannelStatus{},
|
||||
},
|
||||
status: online,
|
||||
}
|
||||
c.dataNodes[n.Address] = newNode
|
||||
newNodes = append(newNodes, n.Address)
|
||||
}
|
||||
|
||||
|
@ -166,6 +157,7 @@ func (c *clusterNodeManager) unregister(addr string) *datapb.DataNodeInfo {
|
|||
if !ok {
|
||||
return nil
|
||||
}
|
||||
delete(c.dataNodes, addr)
|
||||
node.status = offline
|
||||
c.updateMetrics()
|
||||
return node.info
|
||||
|
|
|
@ -12,6 +12,8 @@ package datacoord
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
memkv "github.com/milvus-io/milvus/internal/kv/mem"
|
||||
|
@ -21,7 +23,8 @@ import (
|
|||
|
||||
func TestClusterCreate(t *testing.T) {
|
||||
cPolicy := newMockStartupPolicy()
|
||||
cluster := createCluster(t, nil, withStartupPolicy(cPolicy))
|
||||
ch := make(chan struct{}, 1)
|
||||
cluster := createCluster(t, nil, withStartupPolicy(cPolicy), mockValidatorOption(ch))
|
||||
addr := "localhost:8080"
|
||||
nodes := []*datapb.DataNodeInfo{
|
||||
{
|
||||
|
@ -32,6 +35,7 @@ func TestClusterCreate(t *testing.T) {
|
|||
}
|
||||
err := cluster.startup(nodes)
|
||||
assert.Nil(t, err)
|
||||
<-ch
|
||||
dataNodes, _ := cluster.dataManager.getDataNodes(true)
|
||||
assert.EqualValues(t, 1, len(dataNodes))
|
||||
assert.EqualValues(t, "localhost:8080", dataNodes[addr].Address)
|
||||
|
@ -58,7 +62,8 @@ func TestRegister(t *testing.T) {
|
|||
func TestUnregister(t *testing.T) {
|
||||
cPolicy := newMockStartupPolicy()
|
||||
unregisterPolicy := newEmptyUnregisterPolicy()
|
||||
cluster := createCluster(t, nil, withStartupPolicy(cPolicy), withUnregistorPolicy(unregisterPolicy))
|
||||
ch := make(chan struct{}, 1)
|
||||
cluster := createCluster(t, nil, withStartupPolicy(cPolicy), withUnregistorPolicy(unregisterPolicy), mockValidatorOption(ch))
|
||||
addr := "localhost:8080"
|
||||
nodes := []*datapb.DataNodeInfo{
|
||||
{
|
||||
|
@ -69,6 +74,7 @@ func TestUnregister(t *testing.T) {
|
|||
}
|
||||
err := cluster.startup(nodes)
|
||||
assert.Nil(t, err)
|
||||
<-ch
|
||||
dataNodes, _ := cluster.dataManager.getDataNodes(true)
|
||||
assert.EqualValues(t, 1, len(dataNodes))
|
||||
assert.EqualValues(t, "localhost:8080", dataNodes[addr].Address)
|
||||
|
@ -78,14 +84,75 @@ func TestUnregister(t *testing.T) {
|
|||
Channels: []*datapb.ChannelStatus{},
|
||||
})
|
||||
dataNodes, _ = cluster.dataManager.getDataNodes(false)
|
||||
assert.EqualValues(t, 1, len(dataNodes))
|
||||
assert.EqualValues(t, offline, cluster.dataManager.dataNodes[addr].status)
|
||||
assert.EqualValues(t, "localhost:8080", dataNodes[addr].Address)
|
||||
assert.EqualValues(t, 0, len(dataNodes))
|
||||
}
|
||||
|
||||
func TestRefresh(t *testing.T) {
|
||||
cPolicy := newMockStartupPolicy()
|
||||
ch := make(chan struct{}, 1)
|
||||
cluster := createCluster(t, nil, withStartupPolicy(cPolicy), clusterOption{
|
||||
apply: func(c *cluster) {
|
||||
c.candidateManager.validate = func(dn *datapb.DataNodeInfo) error {
|
||||
if strings.Contains(dn.Address, "inv") {
|
||||
return errors.New("invalid dn")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
c.candidateManager.enable = func(dn *datapb.DataNodeInfo) error {
|
||||
err := c.enableDataNode(dn)
|
||||
ch <- struct{}{}
|
||||
return err
|
||||
}
|
||||
},
|
||||
})
|
||||
addr := "localhost:8080"
|
||||
nodes := []*datapb.DataNodeInfo{
|
||||
{
|
||||
Address: addr,
|
||||
Version: 1,
|
||||
Channels: []*datapb.ChannelStatus{},
|
||||
},
|
||||
{
|
||||
Address: addr + "invalid",
|
||||
Version: 1,
|
||||
Channels: []*datapb.ChannelStatus{},
|
||||
},
|
||||
}
|
||||
err := cluster.startup(nodes)
|
||||
assert.Nil(t, err)
|
||||
<-ch
|
||||
dataNodes, _ := cluster.dataManager.getDataNodes(true)
|
||||
if !assert.Equal(t, 1, len(dataNodes)) {
|
||||
t.FailNow()
|
||||
}
|
||||
assert.Equal(t, addr, dataNodes[addr].Address)
|
||||
addr2 := "localhost:8081"
|
||||
nodes = []*datapb.DataNodeInfo{
|
||||
{
|
||||
Address: addr2,
|
||||
Version: 1,
|
||||
Channels: []*datapb.ChannelStatus{},
|
||||
},
|
||||
{
|
||||
Address: addr2 + "invalid",
|
||||
Version: 1,
|
||||
Channels: []*datapb.ChannelStatus{},
|
||||
},
|
||||
}
|
||||
err = cluster.refresh(nodes)
|
||||
assert.Nil(t, err)
|
||||
<-ch
|
||||
dataNodes, _ = cluster.dataManager.getDataNodes(true)
|
||||
assert.Equal(t, 1, len(dataNodes))
|
||||
_, has := dataNodes[addr]
|
||||
assert.False(t, has)
|
||||
assert.Equal(t, addr2, dataNodes[addr2].Address)
|
||||
}
|
||||
|
||||
func TestWatchIfNeeded(t *testing.T) {
|
||||
cPolicy := newMockStartupPolicy()
|
||||
cluster := createCluster(t, nil, withStartupPolicy(cPolicy))
|
||||
ch := make(chan struct{}, 1)
|
||||
cluster := createCluster(t, nil, withStartupPolicy(cPolicy), mockValidatorOption(ch))
|
||||
addr := "localhost:8080"
|
||||
nodes := []*datapb.DataNodeInfo{
|
||||
{
|
||||
|
@ -96,6 +163,7 @@ func TestWatchIfNeeded(t *testing.T) {
|
|||
}
|
||||
err := cluster.startup(nodes)
|
||||
assert.Nil(t, err)
|
||||
<-ch
|
||||
dataNodes, _ := cluster.dataManager.getDataNodes(true)
|
||||
assert.EqualValues(t, 1, len(dataNodes))
|
||||
assert.EqualValues(t, "localhost:8080", dataNodes[addr].Address)
|
||||
|
@ -112,7 +180,8 @@ func TestWatchIfNeeded(t *testing.T) {
|
|||
|
||||
func TestFlushSegments(t *testing.T) {
|
||||
cPolicy := newMockStartupPolicy()
|
||||
cluster := createCluster(t, nil, withStartupPolicy(cPolicy))
|
||||
ch := make(chan struct{}, 1)
|
||||
cluster := createCluster(t, nil, withStartupPolicy(cPolicy), mockValidatorOption(ch))
|
||||
addr := "localhost:8080"
|
||||
nodes := []*datapb.DataNodeInfo{
|
||||
{
|
||||
|
@ -123,6 +192,7 @@ func TestFlushSegments(t *testing.T) {
|
|||
}
|
||||
err := cluster.startup(nodes)
|
||||
assert.Nil(t, err)
|
||||
<-ch
|
||||
segments := []*datapb.SegmentInfo{
|
||||
{
|
||||
ID: 0,
|
||||
|
@ -134,6 +204,21 @@ func TestFlushSegments(t *testing.T) {
|
|||
cluster.flush(segments)
|
||||
}
|
||||
|
||||
func mockValidatorOption(ch chan<- struct{}) clusterOption {
|
||||
return clusterOption{
|
||||
apply: func(c *cluster) {
|
||||
c.candidateManager.validate = func(dn *datapb.DataNodeInfo) error {
|
||||
return nil
|
||||
}
|
||||
c.candidateManager.enable = func(dn *datapb.DataNodeInfo) error {
|
||||
err := c.enableDataNode(dn)
|
||||
ch <- struct{}{}
|
||||
return err
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func createCluster(t *testing.T, ch chan interface{}, options ...clusterOption) *cluster {
|
||||
kv := memkv.NewMemoryKV()
|
||||
sessionManager := newMockSessionManager(ch)
|
||||
|
|
Loading…
Reference in New Issue