Fix bug for dataservice dockerfile

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
pull/4973/head^2
cai.zhang 2021-02-04 18:11:19 +08:00 committed by yefu.chen
parent e0e8e1605e
commit 4f2a1e7912
15 changed files with 88 additions and 75 deletions

View File

@ -5,7 +5,7 @@ services:
image: ${TARGET_REPO}/proxyservice:${TARGET_TAG}
build:
context: ../../../
dockerfile: build/docker/deploy/proxyservice/DockerFile
dockerfile: build/docker/deploy/proxyservice/Dockerfile
cache_from:
- ${SOURCE_REPO}/proxyservice:${SOURCE_TAG}
networks:
@ -15,7 +15,7 @@ services:
image: ${TARGET_REPO}/proxynode:${TARGET_TAG}
build:
context: ../../../
dockerfile: build/docker/deploy/proxynode/DockerFile
dockerfile: build/docker/deploy/proxynode/Dockerfile
cache_from:
- ${SOURCE_REPO}/proxynode:${SOURCE_TAG}
environment:
@ -30,7 +30,7 @@ services:
image: ${TARGET_REPO}/queryservice:${TARGET_TAG}
build:
context: ../../../
dockerfile: build/docker/deploy/queryservice/DockerFile
dockerfile: build/docker/deploy/queryservice/Dockerfile
cache_from:
- ${SOURCE_REPO}/queryservice:${SOURCE_TAG}
environment:
@ -45,7 +45,7 @@ services:
image: ${TARGET_REPO}/querynode:${TARGET_TAG}
build:
context: ../../../
dockerfile: build/docker/deploy/querynode/DockerFile
dockerfile: build/docker/deploy/querynode/Dockerfile
cache_from:
- ${SOURCE_REPO}/querynode:${SOURCE_TAG}
environment:
@ -60,7 +60,7 @@ services:
image: ${TARGET_REPO}/datanode:${TARGET_TAG}
build:
context: ../../../
dockerfile: build/docker/deploy/datanode/DockerFile
dockerfile: build/docker/deploy/datanode/Dockerfile
cache_from:
- ${SOURCE_REPO}/datanode:${SOURCE_TAG}
environment:
@ -75,7 +75,7 @@ services:
image: ${TARGET_REPO}/indexservice:${TARGET_TAG}
build:
context: ../../../
dockerfile: build/docker/deploy/indexservice/DockerFile
dockerfile: build/docker/deploy/indexservice/Dockerfile
cache_from:
- ${SOURCE_REPO}/indexservice:${SOURCE_TAG}
environment:
@ -89,7 +89,7 @@ services:
image: ${TARGET_REPO}/indexnode:${TARGET_TAG}
build:
context: ../../../
dockerfile: build/docker/deploy/indexnode/DockerFile
dockerfile: build/docker/deploy/indexnode/Dockerfile
cache_from:
- ${SOURCE_REPO}/indexnode:${SOURCE_TAG}
environment:

View File

@ -94,6 +94,7 @@ type (
ddChannelName string
segmentInfoStream msgstream.MsgStream
insertChannels []string
ttBarrier timesync.TimeTickBarrier
}
)
@ -177,23 +178,23 @@ func (s *Server) initSegmentInfoChannel() {
s.segmentInfoStream.Start()
}
func (s *Server) initMsgProducer() error {
var err error
factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
ttMsgStream, _ := factory.NewMsgStream(s.ctx)
ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName}, Params.DataServiceSubscriptionName)
s.ttMsgStream = ttMsgStream
s.ttMsgStream.Start()
timeTickBarrier := timesync.NewHardTimeTickBarrier(s.ttMsgStream, s.cluster.GetNodeIDs())
dataNodeTTWatcher := newDataNodeTimeTickWatcher(s.meta, s.segAllocator, s.cluster)
k2sStream, _ := factory.NewMsgStream(s.ctx)
k2sStream.AsProducer(Params.K2SChannelNames)
s.k2sMsgStream = k2sStream
s.k2sMsgStream.Start()
k2sMsgWatcher := timesync.NewMsgTimeTickWatcher(s.k2sMsgStream)
producer, err := timesync.NewTimeSyncMsgProducer(timeTickBarrier, dataNodeTTWatcher, k2sMsgWatcher)
if err != nil {
if s.ttMsgStream, err = factory.NewMsgStream(s.ctx); err != nil {
return err
}
s.ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName}, Params.DataServiceSubscriptionName)
s.ttMsgStream.Start()
if s.k2sMsgStream, err = factory.NewMsgStream(s.ctx); err != nil {
return err
}
s.k2sMsgStream.AsProducer(Params.K2SChannelNames)
s.k2sMsgStream.Start()
dataNodeTTWatcher := newDataNodeTimeTickWatcher(s.meta, s.segAllocator, s.cluster)
k2sMsgWatcher := timesync.NewMsgTimeTickWatcher(s.k2sMsgStream)
if s.msgProducer, err = timesync.NewTimeSyncMsgProducer(s.ttBarrier, dataNodeTTWatcher, k2sMsgWatcher); err != nil {
return err
}
s.msgProducer = producer
s.msgProducer.Start(s.ctx)
return nil
}
@ -297,10 +298,11 @@ func (s *Server) checkMasterIsHealthy() error {
func (s *Server) startServerLoop() {
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
s.serverLoopWg.Add(3)
s.serverLoopWg.Add(4)
go s.startStatsChannel(s.serverLoopCtx)
go s.startSegmentFlushChannel(s.serverLoopCtx)
go s.startDDChannel(s.serverLoopCtx)
go s.startTTBarrier(s.serverLoopCtx)
}
func (s *Server) startStatsChannel(ctx context.Context) {
@ -388,6 +390,12 @@ func (s *Server) startDDChannel(ctx context.Context) {
}
}
func (s *Server) startTTBarrier(ctx context.Context) {
defer s.serverLoopWg.Done()
s.ttBarrier = timesync.NewHardTimeTickBarrier(ctx, s.ttMsgStream, s.cluster.GetNodeIDs())
s.ttBarrier.StartBackgroundLoop()
}
func (s *Server) waitDataNodeRegister() {
log.Println("waiting data node to register")
<-s.registerFinishCh

View File

@ -20,20 +20,19 @@ type Cache interface {
GetPartitionID(collectionName string, partitionName string) (typeutil.UniqueID, error)
GetCollectionSchema(collectionName string) (*schemapb.CollectionSchema, error)
RemoveCollection(collectionName string)
RemovePartition(partitionName string)
RemovePartition(collectionName string, partitionName string)
}
type collectionInfo struct {
collID typeutil.UniqueID
schema *schemapb.CollectionSchema
collID typeutil.UniqueID
schema *schemapb.CollectionSchema
partInfo map[string]typeutil.UniqueID
}
type MetaCache struct {
client MasterClientInterface
collInfo map[string]*collectionInfo
partInfo map[string]typeutil.UniqueID
col2par map[string][]string
mu sync.RWMutex
}
@ -52,8 +51,6 @@ func NewMetaCache(client MasterClientInterface) (*MetaCache, error) {
return &MetaCache{
client: client,
collInfo: map[string]*collectionInfo{},
partInfo: map[string]typeutil.UniqueID{},
col2par: map[string][]string{},
}, nil
}
@ -79,11 +76,16 @@ func (m *MetaCache) readCollectionSchema(collectionName string) (*schemapb.Colle
return collInfo.schema, nil
}
func (m *MetaCache) readPartitionID(partitionName string) (typeutil.UniqueID, error) {
func (m *MetaCache) readPartitionID(collectionName string, partitionName string) (typeutil.UniqueID, error) {
m.mu.RLock()
defer m.mu.RUnlock()
partitionID, ok := m.partInfo[partitionName]
collInfo, ok := m.collInfo[collectionName]
if !ok {
return 0, errors.Errorf("can't find collection name:%s", collectionName)
}
partitionID, ok := collInfo.partInfo[partitionName]
if !ok {
return 0, errors.Errorf("can't find partition name:%s", partitionName)
}
@ -112,15 +114,14 @@ func (m *MetaCache) GetCollectionID(collectionName string) (typeutil.UniqueID, e
return 0, errors.Errorf("%s", coll.Status.Reason)
}
collInfo := &collectionInfo{
collID: coll.CollectionID,
schema: coll.Schema,
}
_, ok := m.collInfo[collectionName]
if !ok {
m.collInfo[collectionName] = collInfo
m.collInfo[collectionName] = &collectionInfo{}
}
return collInfo.collID, nil
m.collInfo[collectionName].schema = coll.Schema
m.collInfo[collectionName].collID = coll.CollectionID
return m.collInfo[collectionName].collID, nil
}
func (m *MetaCache) GetCollectionSchema(collectionName string) (*schemapb.CollectionSchema, error) {
collSchema, err := m.readCollectionSchema(collectionName)
@ -144,19 +145,18 @@ func (m *MetaCache) GetCollectionSchema(collectionName string) (*schemapb.Collec
return nil, errors.Errorf("%s", coll.Status.Reason)
}
collInfo := &collectionInfo{
collID: coll.CollectionID,
schema: coll.Schema,
}
_, ok := m.collInfo[collectionName]
if !ok {
m.collInfo[collectionName] = collInfo
m.collInfo[collectionName] = &collectionInfo{}
}
return collInfo.schema, nil
m.collInfo[collectionName].schema = coll.Schema
m.collInfo[collectionName].collID = coll.CollectionID
return m.collInfo[collectionName].schema, nil
}
func (m *MetaCache) GetPartitionID(collectionName string, partitionName string) (typeutil.UniqueID, error) {
partitionID, err := m.readPartitionID(partitionName)
partitionID, err := m.readPartitionID(collectionName, partitionName)
if err == nil {
return partitionID, nil
}
@ -180,34 +180,45 @@ func (m *MetaCache) GetPartitionID(collectionName string, partitionName string)
return 0, errors.Errorf("partition ids len: %d doesn't equal Partition name len %d",
len(partitions.PartitionIDs), len(partitions.PartitionNames))
}
m.col2par[collectionName] = partitions.PartitionNames
for i := 0; i < len(partitions.PartitionIDs); i++ {
_, ok := m.partInfo[partitions.PartitionNames[i]]
if !ok {
m.partInfo[partitions.PartitionNames[i]] = partitions.PartitionIDs[i]
_, ok := m.collInfo[collectionName]
if !ok {
m.collInfo[collectionName] = &collectionInfo{
partInfo: map[string]typeutil.UniqueID{},
}
}
_, ok := m.partInfo[partitionName]
partInfo := m.collInfo[collectionName].partInfo
for i := 0; i < len(partitions.PartitionIDs); i++ {
_, ok := partInfo[partitions.PartitionNames[i]]
if !ok {
partInfo[partitions.PartitionNames[i]] = partitions.PartitionIDs[i]
}
}
_, ok = partInfo[partitionName]
if !ok {
return 0, errors.Errorf("partitionID of partitionName:%s can not be find", partitionName)
}
return m.partInfo[partitionName], nil
return partInfo[partitionName], nil
}
func (m *MetaCache) RemoveCollection(collectionName string) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.collInfo, collectionName)
for _, partitionName := range m.col2par[collectionName] {
delete(m.partInfo, partitionName)
}
delete(m.col2par, collectionName)
}
func (m *MetaCache) RemovePartition(partitionName string) {
func (m *MetaCache) RemovePartition(collectionName, partitionName string) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.partInfo, partitionName)
_, ok := m.collInfo[collectionName]
if !ok {
return
}
partInfo := m.collInfo[collectionName].partInfo
if partInfo == nil {
return
}
delete(partInfo, partitionName)
}

View File

@ -63,19 +63,13 @@ func (producer *MsgProducer) broadcastMsg() {
func (producer *MsgProducer) Start(ctx context.Context) {
producer.ctx, producer.cancel = context.WithCancel(ctx)
producer.wg.Add(2 + len(producer.watchers))
go producer.startTTBarrier()
producer.wg.Add(1 + len(producer.watchers))
for _, watcher := range producer.watchers {
go producer.startWatcher(watcher)
}
go producer.broadcastMsg()
}
func (producer *MsgProducer) startTTBarrier() {
defer producer.wg.Done()
producer.ttBarrier.StartBackgroundLoop(producer.ctx)
}
func (producer *MsgProducer) startWatcher(watcher TimeTickWatcher) {
defer producer.wg.Done()
watcher.StartBackgroundLoop(producer.ctx)

View File

@ -18,7 +18,7 @@ type (
TimeTickBarrier interface {
GetTimeTick() (Timestamp, error)
StartBackgroundLoop(ctx context.Context)
StartBackgroundLoop()
}
softTimeTickBarrier struct {
@ -38,7 +38,7 @@ type (
}
)
func NewSoftTimeTickBarrier(ttStream ms.MsgStream, peerIds []UniqueID, minTtInterval Timestamp) *softTimeTickBarrier {
func NewSoftTimeTickBarrier(ctx context.Context, ttStream ms.MsgStream, peerIds []UniqueID, minTtInterval Timestamp) *softTimeTickBarrier {
if len(peerIds) <= 0 {
log.Printf("[newSoftTimeTickBarrier] Error: peerIds is empty!\n")
return nil
@ -49,6 +49,7 @@ func NewSoftTimeTickBarrier(ttStream ms.MsgStream, peerIds []UniqueID, minTtInte
sttbarrier.ttStream = ttStream
sttbarrier.outTt = make(chan Timestamp, 1024)
sttbarrier.peer2LastTt = make(map[UniqueID]Timestamp)
sttbarrier.ctx = ctx
for _, id := range peerIds {
sttbarrier.peer2LastTt[id] = Timestamp(0)
}
@ -79,12 +80,11 @@ func (ttBarrier *softTimeTickBarrier) GetTimeTick() (Timestamp, error) {
}
}
func (ttBarrier *softTimeTickBarrier) StartBackgroundLoop(ctx context.Context) {
ttBarrier.ctx = ctx
func (ttBarrier *softTimeTickBarrier) StartBackgroundLoop() {
for {
select {
case <-ctx.Done():
log.Printf("[TtBarrierStart] %s\n", ctx.Err())
case <-ttBarrier.ctx.Done():
log.Printf("[TtBarrierStart] %s\n", ttBarrier.ctx.Err())
return
default:
}
@ -137,14 +137,13 @@ func (ttBarrier *hardTimeTickBarrier) GetTimeTick() (Timestamp, error) {
}
}
func (ttBarrier *hardTimeTickBarrier) StartBackgroundLoop(ctx context.Context) {
ttBarrier.ctx = ctx
func (ttBarrier *hardTimeTickBarrier) StartBackgroundLoop() {
// Last timestamp synchronized
state := Timestamp(0)
for {
select {
case <-ctx.Done():
log.Printf("[TtBarrierStart] %s\n", ctx.Err())
case <-ttBarrier.ctx.Done():
log.Printf("[TtBarrierStart] %s\n", ttBarrier.ctx.Err())
return
default:
}
@ -188,7 +187,7 @@ func (ttBarrier *hardTimeTickBarrier) minTimestamp() Timestamp {
return tempMin
}
func NewHardTimeTickBarrier(ttStream ms.MsgStream, peerIds []UniqueID) *hardTimeTickBarrier {
func NewHardTimeTickBarrier(ctx context.Context, ttStream ms.MsgStream, peerIds []UniqueID) *hardTimeTickBarrier {
if len(peerIds) <= 0 {
log.Printf("[newSoftTimeTickBarrier] Error: peerIds is empty!")
return nil
@ -199,6 +198,7 @@ func NewHardTimeTickBarrier(ttStream ms.MsgStream, peerIds []UniqueID) *hardTime
sttbarrier.outTt = make(chan Timestamp, 1024)
sttbarrier.peer2Tt = make(map[UniqueID]Timestamp)
sttbarrier.ctx = ctx
for _, id := range peerIds {
sttbarrier.peer2Tt[id] = Timestamp(0)
}