mirror of https://github.com/milvus-io/milvus.git
Fix data service startup bug
Signed-off-by: sunby <bingyi.sun@zilliz.com>pull/4973/head^2
parent
05b013c52a
commit
b4bcaaf847
|
@ -10,4 +10,4 @@ dataservice:
|
|||
# old name: segmentExpireDuration: 2000
|
||||
IDAssignExpiration: 2000 # ms
|
||||
insertChannelNumPerCollection: 4
|
||||
dataNodeNum: 2
|
||||
dataNodeNum: 1
|
|
@ -16,7 +16,6 @@ import (
|
|||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -39,6 +38,7 @@ type (
|
|||
FlushSegments(in *datapb.FlushSegRequest) (*commonpb.Status, error)
|
||||
|
||||
SetMasterServiceInterface(ms MasterServiceInterface) error
|
||||
|
||||
SetDataServiceInterface(ds DataServiceInterface) error
|
||||
}
|
||||
|
||||
|
@ -55,6 +55,7 @@ type (
|
|||
}
|
||||
|
||||
DataNode struct {
|
||||
// GOOSE TODO: complete interface with component
|
||||
ctx context.Context
|
||||
NodeID UniqueID
|
||||
Role string
|
||||
|
@ -79,8 +80,8 @@ func NewDataNode(ctx context.Context) *DataNode {
|
|||
Params.Init()
|
||||
node := &DataNode{
|
||||
ctx: ctx,
|
||||
NodeID: Params.NodeID, // GOOSE TODO
|
||||
Role: typeutil.DataNodeRole, // GOOSE TODO
|
||||
NodeID: Params.NodeID, // GOOSE TODO
|
||||
Role: "DataNode", // GOOSE TODO
|
||||
State: internalpb2.StateCode_INITIALIZING,
|
||||
dataSyncService: nil,
|
||||
metaService: nil,
|
||||
|
@ -117,10 +118,11 @@ func (node *DataNode) Init() error {
|
|||
|
||||
resp, err := node.dataService.RegisterNode(req)
|
||||
if err != nil {
|
||||
return errors.Errorf("Register node failed: %v", err)
|
||||
return errors.Errorf("Init failed: %v", err)
|
||||
}
|
||||
|
||||
for _, kv := range resp.InitParams.StartParams {
|
||||
log.Println(kv)
|
||||
switch kv.Key {
|
||||
case "DDChannelName":
|
||||
Params.DDChannelNames = []string{kv.Value}
|
||||
|
@ -148,7 +150,7 @@ func (node *DataNode) Init() error {
|
|||
node.metaService = newMetaService(node.ctx, replica, node.masterService)
|
||||
node.replica = replica
|
||||
|
||||
// --- Opentracing ---
|
||||
// Opentracing
|
||||
cfg := &config.Configuration{
|
||||
ServiceName: "data_node",
|
||||
Sampler: &config.SamplerConfig{
|
||||
|
@ -165,6 +167,7 @@ func (node *DataNode) Init() error {
|
|||
}
|
||||
node.tracer = tracer
|
||||
node.closer = closer
|
||||
|
||||
opentracing.SetGlobalTracer(node.tracer)
|
||||
|
||||
return nil
|
||||
|
|
|
@ -27,7 +27,6 @@ func newMetaService(ctx context.Context, replica collectionReplica, m MasterServ
|
|||
}
|
||||
|
||||
func (mService *metaService) init() {
|
||||
log.Println("Initing meta ...")
|
||||
err := mService.loadCollections()
|
||||
if err != nil {
|
||||
log.Fatal("metaService init failed:", err)
|
||||
|
|
|
@ -41,7 +41,7 @@ func newDataNodeCluster(finishCh chan struct{}) *dataNodeCluster {
|
|||
func (c *dataNodeCluster) Register(ip string, port int64, id int64) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if !c.checkDataNodeNotExist(ip, port) {
|
||||
if c.checkDataNodeNotExist(ip, port) {
|
||||
c.nodes = append(c.nodes, &dataNode{
|
||||
id: id,
|
||||
address: struct {
|
||||
|
@ -50,9 +50,9 @@ func (c *dataNodeCluster) Register(ip string, port int64, id int64) {
|
|||
}{ip: ip, port: port},
|
||||
channelNum: 0,
|
||||
})
|
||||
}
|
||||
if len(c.nodes) == Params.DataNodeNum {
|
||||
close(c.finishCh)
|
||||
if len(c.nodes) == Params.DataNodeNum {
|
||||
close(c.finishCh)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"sync"
|
||||
|
||||
dn "github.com/zilliztech/milvus-distributed/internal/datanode"
|
||||
"github.com/zilliztech/milvus-distributed/internal/errors"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
|
||||
|
@ -66,6 +67,11 @@ func (s *Server) SetDataServiceInterface(ds dn.DataServiceInterface) error {
|
|||
}
|
||||
|
||||
func (s *Server) Init() error {
|
||||
err := s.core.Init()
|
||||
if err != nil {
|
||||
return errors.Errorf("Init failed: %v", err)
|
||||
}
|
||||
|
||||
return s.core.Init()
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue