mirror of https://github.com/milvus-io/milvus.git
Fix datanode watchDmChannel bug
Signed-off-by: XuanYang-cn <xuan.yang@zilliz.com>pull/4973/head^2
parent
d6200a5196
commit
560d2350ae
|
@ -56,11 +56,12 @@ type (
|
|||
}
|
||||
|
||||
DataNode struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
NodeID UniqueID
|
||||
Role string
|
||||
State internalpb2.StateCode
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
NodeID UniqueID
|
||||
Role string
|
||||
State internalpb2.StateCode
|
||||
watchDm chan struct{}
|
||||
|
||||
dataSyncService *dataSyncService
|
||||
metaService *metaService
|
||||
|
@ -81,11 +82,13 @@ func NewDataNode(ctx context.Context) *DataNode {
|
|||
Params.Init()
|
||||
ctx2, cancel2 := context.WithCancel(ctx)
|
||||
node := &DataNode{
|
||||
ctx: ctx2,
|
||||
cancel: cancel2,
|
||||
NodeID: Params.NodeID, // GOOSE TODO: How to init
|
||||
Role: typeutil.DataNodeRole,
|
||||
State: internalpb2.StateCode_INITIALIZING, // GOOSE TODO: atomic
|
||||
ctx: ctx2,
|
||||
cancel: cancel2,
|
||||
NodeID: Params.NodeID, // GOOSE TODO: How to init
|
||||
Role: typeutil.DataNodeRole,
|
||||
State: internalpb2.StateCode_INITIALIZING, // GOOSE TODO: atomic
|
||||
watchDm: make(chan struct{}),
|
||||
|
||||
dataSyncService: nil,
|
||||
metaService: nil,
|
||||
masterService: nil,
|
||||
|
@ -135,6 +138,13 @@ func (node *DataNode) Init() error {
|
|||
return errors.Errorf("Register node failed: %v", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-time.After(RPCConnectionTimeout):
|
||||
return errors.New("Get DmChannels failed in 30 seconds")
|
||||
case <-node.watchDm:
|
||||
log.Println("insert channel names set")
|
||||
}
|
||||
|
||||
for _, kv := range resp.InitParams.StartParams {
|
||||
switch kv.Key {
|
||||
case "DDChannelName":
|
||||
|
@ -162,10 +172,10 @@ func (node *DataNode) Init() error {
|
|||
node.flushChan = make(chan *flushMsg, chanSize)
|
||||
|
||||
node.dataSyncService = newDataSyncService(node.ctx, node.flushChan, replica, alloc)
|
||||
node.dataSyncService.init()
|
||||
node.metaService = newMetaService(node.ctx, replica, node.masterService)
|
||||
|
||||
node.replica = replica
|
||||
node.dataSyncService.initNodes()
|
||||
|
||||
// --- Opentracing ---
|
||||
cfg := &config.Configuration{
|
||||
|
@ -191,14 +201,9 @@ func (node *DataNode) Init() error {
|
|||
|
||||
func (node *DataNode) Start() error {
|
||||
node.metaService.init()
|
||||
return nil
|
||||
}
|
||||
|
||||
// DataNode is HEALTHY until StartSync() is called
|
||||
func (node *DataNode) StartSync() {
|
||||
node.dataSyncService.init()
|
||||
go node.dataSyncService.start()
|
||||
node.State = internalpb2.StateCode_HEALTHY
|
||||
return nil
|
||||
}
|
||||
|
||||
func (node *DataNode) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) {
|
||||
|
@ -219,7 +224,7 @@ func (node *DataNode) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*common
|
|||
default:
|
||||
Params.InsertChannelNames = in.GetChannelNames()
|
||||
status.ErrorCode = commonpb.ErrorCode_SUCCESS
|
||||
node.StartSync()
|
||||
node.watchDm <- struct{}{}
|
||||
return status, nil
|
||||
}
|
||||
}
|
||||
|
|
|
@ -76,7 +76,6 @@ func (s *Server) Start() error {
|
|||
|
||||
func (s *Server) Stop() error {
|
||||
err := s.core.Stop()
|
||||
s.cancel()
|
||||
s.grpcServer.GracefulStop()
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -20,19 +20,20 @@ type Cache interface {
|
|||
GetPartitionID(collectionName string, partitionName string) (typeutil.UniqueID, error)
|
||||
GetCollectionSchema(collectionName string) (*schemapb.CollectionSchema, error)
|
||||
RemoveCollection(collectionName string)
|
||||
RemovePartition(collectionName string, partitionName string)
|
||||
RemovePartition(partitionName string)
|
||||
}
|
||||
|
||||
type collectionInfo struct {
|
||||
collID typeutil.UniqueID
|
||||
schema *schemapb.CollectionSchema
|
||||
partInfo map[string]typeutil.UniqueID
|
||||
collID typeutil.UniqueID
|
||||
schema *schemapb.CollectionSchema
|
||||
}
|
||||
|
||||
type MetaCache struct {
|
||||
client MasterClientInterface
|
||||
|
||||
collInfo map[string]*collectionInfo
|
||||
partInfo map[string]typeutil.UniqueID
|
||||
col2par map[string][]string
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
|
@ -51,6 +52,8 @@ func NewMetaCache(client MasterClientInterface) (*MetaCache, error) {
|
|||
return &MetaCache{
|
||||
client: client,
|
||||
collInfo: map[string]*collectionInfo{},
|
||||
partInfo: map[string]typeutil.UniqueID{},
|
||||
col2par: map[string][]string{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -76,16 +79,11 @@ func (m *MetaCache) readCollectionSchema(collectionName string) (*schemapb.Colle
|
|||
return collInfo.schema, nil
|
||||
}
|
||||
|
||||
func (m *MetaCache) readPartitionID(collectionName string, partitionName string) (typeutil.UniqueID, error) {
|
||||
func (m *MetaCache) readPartitionID(partitionName string) (typeutil.UniqueID, error) {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
collInfo, ok := m.collInfo[collectionName]
|
||||
if !ok {
|
||||
return 0, errors.Errorf("can't find collection name:%s", collectionName)
|
||||
}
|
||||
|
||||
partitionID, ok := collInfo.partInfo[partitionName]
|
||||
partitionID, ok := m.partInfo[partitionName]
|
||||
if !ok {
|
||||
return 0, errors.Errorf("can't find partition name:%s", partitionName)
|
||||
}
|
||||
|
@ -114,14 +112,15 @@ func (m *MetaCache) GetCollectionID(collectionName string) (typeutil.UniqueID, e
|
|||
return 0, errors.Errorf("%s", coll.Status.Reason)
|
||||
}
|
||||
|
||||
collInfo := &collectionInfo{
|
||||
collID: coll.CollectionID,
|
||||
schema: coll.Schema,
|
||||
}
|
||||
_, ok := m.collInfo[collectionName]
|
||||
if !ok {
|
||||
m.collInfo[collectionName] = &collectionInfo{}
|
||||
m.collInfo[collectionName] = collInfo
|
||||
}
|
||||
m.collInfo[collectionName].schema = coll.Schema
|
||||
m.collInfo[collectionName].collID = coll.CollectionID
|
||||
|
||||
return m.collInfo[collectionName].collID, nil
|
||||
return collInfo.collID, nil
|
||||
}
|
||||
func (m *MetaCache) GetCollectionSchema(collectionName string) (*schemapb.CollectionSchema, error) {
|
||||
collSchema, err := m.readCollectionSchema(collectionName)
|
||||
|
@ -145,18 +144,19 @@ func (m *MetaCache) GetCollectionSchema(collectionName string) (*schemapb.Collec
|
|||
return nil, errors.Errorf("%s", coll.Status.Reason)
|
||||
}
|
||||
|
||||
collInfo := &collectionInfo{
|
||||
collID: coll.CollectionID,
|
||||
schema: coll.Schema,
|
||||
}
|
||||
_, ok := m.collInfo[collectionName]
|
||||
if !ok {
|
||||
m.collInfo[collectionName] = &collectionInfo{}
|
||||
m.collInfo[collectionName] = collInfo
|
||||
}
|
||||
m.collInfo[collectionName].schema = coll.Schema
|
||||
m.collInfo[collectionName].collID = coll.CollectionID
|
||||
|
||||
return m.collInfo[collectionName].schema, nil
|
||||
return collInfo.schema, nil
|
||||
}
|
||||
|
||||
func (m *MetaCache) GetPartitionID(collectionName string, partitionName string) (typeutil.UniqueID, error) {
|
||||
partitionID, err := m.readPartitionID(collectionName, partitionName)
|
||||
partitionID, err := m.readPartitionID(partitionName)
|
||||
if err == nil {
|
||||
return partitionID, nil
|
||||
}
|
||||
|
@ -180,45 +180,34 @@ func (m *MetaCache) GetPartitionID(collectionName string, partitionName string)
|
|||
return 0, errors.Errorf("partition ids len: %d doesn't equal Partition name len %d",
|
||||
len(partitions.PartitionIDs), len(partitions.PartitionNames))
|
||||
}
|
||||
|
||||
_, ok := m.collInfo[collectionName]
|
||||
if !ok {
|
||||
m.collInfo[collectionName] = &collectionInfo{
|
||||
partInfo: map[string]typeutil.UniqueID{},
|
||||
}
|
||||
}
|
||||
partInfo := m.collInfo[collectionName].partInfo
|
||||
m.col2par[collectionName] = partitions.PartitionNames
|
||||
|
||||
for i := 0; i < len(partitions.PartitionIDs); i++ {
|
||||
_, ok := partInfo[partitions.PartitionNames[i]]
|
||||
_, ok := m.partInfo[partitions.PartitionNames[i]]
|
||||
if !ok {
|
||||
partInfo[partitions.PartitionNames[i]] = partitions.PartitionIDs[i]
|
||||
m.partInfo[partitions.PartitionNames[i]] = partitions.PartitionIDs[i]
|
||||
}
|
||||
}
|
||||
_, ok = partInfo[partitionName]
|
||||
_, ok := m.partInfo[partitionName]
|
||||
if !ok {
|
||||
return 0, errors.Errorf("partitionID of partitionName:%s can not be find", partitionName)
|
||||
}
|
||||
return m.partInfo[partitionName], nil
|
||||
|
||||
return partInfo[partitionName], nil
|
||||
}
|
||||
|
||||
func (m *MetaCache) RemoveCollection(collectionName string) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
delete(m.collInfo, collectionName)
|
||||
for _, partitionName := range m.col2par[collectionName] {
|
||||
delete(m.partInfo, partitionName)
|
||||
}
|
||||
delete(m.col2par, collectionName)
|
||||
}
|
||||
|
||||
func (m *MetaCache) RemovePartition(collectionName, partitionName string) {
|
||||
func (m *MetaCache) RemovePartition(partitionName string) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
_, ok := m.collInfo[collectionName]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
partInfo := m.collInfo[collectionName].partInfo
|
||||
if partInfo == nil {
|
||||
return
|
||||
}
|
||||
delete(partInfo, partitionName)
|
||||
delete(m.partInfo, partitionName)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue