Enable multiple datanode (#5068)

Now it's able to start multiple datanode, however, they will process the same insert data.
Further changes are needed to truly enable multiple datanode:

While registering, dataservice need to return different insert channels for different datanode.

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/5083/head
XuanYang-cn 2021-04-28 18:43:48 +08:00 committed by GitHub
parent 004598f796
commit cebdfb34f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 29 additions and 75 deletions

2
.gitignore vendored
View File

@ -59,7 +59,7 @@ cmake_build/
*.log
.DS_Store
*.swp
*.sw[po]
cwrapper_build
**/cwrapper_rocksdb_build/
**/.clangd/*

View File

@ -12,7 +12,6 @@
nodeID: # will be deprecated later
queryNodeIDList: [1]
dataNodeIDList: [3]
etcd:
address: localhost

2
go.mod
View File

@ -11,7 +11,7 @@ require (
github.com/frankban/quicktest v1.10.2 // indirect
github.com/go-basic/ipv4 v1.0.0
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/protobuf v1.3.2
github.com/golang/protobuf v1.4.2
github.com/google/btree v1.0.0
github.com/jarcoal/httpmock v1.0.8
github.com/klauspost/compress v1.10.11 // indirect

View File

@ -22,9 +22,6 @@ import (
)
type ParamTable struct {
// === PRIVATE Configs ===
dataNodeIDList []UniqueID
paramtable.BaseTable
// === DataNode Internal Components Configs ===
@ -126,21 +123,17 @@ func (p *ParamTable) Init() {
// ==== DataNode internal components configs ====
func (p *ParamTable) initNodeID() {
p.dataNodeIDList = p.DataNodeIDList()
dataNodeIDStr := os.Getenv("DATA_NODE_ID")
if dataNodeIDStr == "" {
if len(p.dataNodeIDList) <= 0 {
dataNodeIDStr = "0"
} else {
dataNodeIDStr = strconv.Itoa(int(p.dataNodeIDList[0]))
}
dataNodeIDStr = "1"
}
err := p.Save("_dataNodeID", dataNodeIDStr)
dnID, err := strconv.Atoi(dataNodeIDStr)
if err != nil {
panic(err)
}
p.NodeID = p.ParseInt64("_dataNodeID")
p.NodeID = UniqueID(dnID)
}
// ---- flowgraph configs ----
@ -283,17 +276,6 @@ func (p *ParamTable) initMinioBucketName() {
p.MinioBucketName = bucketName
}
func (p *ParamTable) sliceIndex() int {
dataNodeID := p.NodeID
dataNodeIDList := p.dataNodeIDList
for i := 0; i < len(dataNodeIDList); i++ {
if dataNodeID == dataNodeIDList[i] {
return i
}
}
return -1
}
func (p *ParamTable) initLogCfg() {
p.Log = log.Config{}
format, err := p.Load("log.format")

View File

@ -124,9 +124,4 @@ func TestParamTable_DataNode(t *testing.T) {
name := Params.MinioBucketName
log.Println("MinioBucketName:", name)
})
t.Run("Test sliceIndex", func(t *testing.T) {
idx := Params.sliceIndex()
log.Println("sliceIndex:", idx)
})
}

View File

@ -433,11 +433,6 @@ func TestChannel(t *testing.T) {
msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentStatistics, 345))
err := statsStream.Produce(&msgPack)
assert.Nil(t, err)
time.Sleep(time.Second)
segInfo, err = svr.meta.GetSegment(segID)
assert.Nil(t, err)
assert.Equal(t, rowNum, segInfo.NumRows)
})
t.Run("Test SegmentFlushChannel", func(t *testing.T) {

View File

@ -12,10 +12,13 @@
package grpcdatanode
import (
"net"
"sync"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
"go.uber.org/zap"
)
var Params ParamTable
@ -24,8 +27,9 @@ var once sync.Once
type ParamTable struct {
paramtable.BaseTable
IP string
Port int
IP string
Port int
listener net.Listener
MasterAddress string
DataServiceAddress string
@ -49,8 +53,15 @@ func (pt *ParamTable) LoadFromEnv() {
}
func (pt *ParamTable) initPort() {
port := pt.ParseInt("dataNode.port")
pt.Port = port
listener, err := net.Listen("tcp", ":0")
if err != nil {
panic(err)
}
pt.Port = listener.Addr().(*net.TCPAddr).Port
pt.listener = listener
log.Info("DataNode", zap.Int("port", pt.Port))
}
func (pt *ParamTable) initMasterAddress() {

View File

@ -23,6 +23,9 @@ func TestParamTable(t *testing.T) {
assert.NotEqual(t, Params.Port, 0)
t.Logf("DataNode Port:%d", Params.Port)
assert.NotNil(t, Params.listener)
t.Logf("DataNode Port:%d", Params.Port)
assert.NotEqual(t, Params.DataServiceAddress, "")
t.Logf("DataServiceAddress:%s", Params.DataServiceAddress)

View File

@ -72,19 +72,9 @@ func New(ctx context.Context, factory msgstream.Factory) (*Server, error) {
return s, nil
}
func (s *Server) startGrpcLoop(grpcPort int) {
func (s *Server) startGrpcLoop(listener net.Listener) {
defer s.wg.Done()
addr := ":" + strconv.Itoa(grpcPort)
lis, err := net.Listen("tcp", addr)
if err != nil {
log.Warn("GrpcServer failed to listen", zap.Error(err))
s.grpcErrChan <- err
return
}
log.Debug("DataNode address", zap.String("address", addr))
tracer := opentracing.GlobalTracer()
s.grpcServer = grpc.NewServer(
grpc.MaxRecvMsgSize(math.MaxInt32),
@ -99,7 +89,7 @@ func (s *Server) startGrpcLoop(grpcPort int) {
defer cancel()
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
if err := s.grpcServer.Serve(lis); err != nil {
if err := s.grpcServer.Serve(listener); err != nil {
log.Warn("DataNode Start Grpc Failed!")
s.grpcErrChan <- err
}
@ -150,10 +140,6 @@ func (s *Server) Stop() error {
func (s *Server) init() error {
ctx := context.Background()
Params.Init()
if !funcutil.CheckPortAvailable(Params.Port) {
Params.Port = funcutil.GetAvailablePort()
log.Warn("DataNode init", zap.Any("Port", Params.Port))
}
Params.LoadFromEnv()
Params.LoadFromArgs()
@ -161,13 +147,13 @@ func (s *Server) init() error {
dn.Params.Port = Params.Port
dn.Params.IP = Params.IP
log.Debug("DataNode port", zap.Int("port", Params.Port))
closer := trace.InitTracing(fmt.Sprintf("data_node ip: %s, port: %d", Params.IP, Params.Port))
s.closer = closer
addr := Params.IP + ":" + strconv.Itoa(Params.Port)
log.Debug("DataNode address", zap.String("address", addr))
s.wg.Add(1)
go s.startGrpcLoop(Params.Port)
go s.startGrpcLoop(Params.listener)
// wait for grpc server loop start
err := <-s.grpcErrChan
if err != nil {

View File

@ -329,23 +329,6 @@ func (gp *BaseTable) ParseInt(key string) int {
return value
}
func (gp *BaseTable) DataNodeIDList() []UniqueID {
datanodeIDStr, err := gp.Load("nodeID.dataNodeIDList")
if err != nil {
panic(err)
}
var ret []UniqueID
datanodeIDs := strings.Split(datanodeIDStr, ",")
for _, i := range datanodeIDs {
v, err := strconv.Atoi(i)
if err != nil {
log.Panicf("load write node id list error, %s", err.Error())
}
ret = append(ret, UniqueID(v))
}
return ret
}
func (gp *BaseTable) QueryNodeIDList() []UniqueID {
queryNodeIDStr, err := gp.Load("nodeID.queryNodeIDList")
if err != nil {