Fix bugs in dataservice (#5457)

Signed-off-by: sunby <bingyi.sun@zilliz.com>
pull/5779/head
sunby 2021-05-28 09:55:21 +08:00 committed by zhenshan.cao
parent 55eac3ee7f
commit a14c35274f
13 changed files with 221 additions and 221 deletions

View File

@ -19,22 +19,22 @@ import (
"github.com/milvus-io/milvus/internal/proto/masterpb"
)
type allocatorInterface interface {
type allocator interface {
allocTimestamp() (Timestamp, error)
allocID() (UniqueID, error)
}
type allocator struct {
type masterAllocator struct {
masterClient types.MasterService
}
func newAllocator(masterClient types.MasterService) *allocator {
return &allocator{
func newAllocator(masterClient types.MasterService) *masterAllocator {
return &masterAllocator{
masterClient: masterClient,
}
}
func (allocator *allocator) allocTimestamp() (Timestamp, error) {
func (allocator *masterAllocator) allocTimestamp() (Timestamp, error) {
ctx := context.TODO()
resp, err := allocator.masterClient.AllocTimestamp(ctx, &masterpb.AllocTimestampRequest{
Base: &commonpb.MsgBase{
@ -51,7 +51,7 @@ func (allocator *allocator) allocTimestamp() (Timestamp, error) {
return resp.Timestamp, nil
}
func (allocator *allocator) allocID() (UniqueID, error) {
func (allocator *masterAllocator) allocID() (UniqueID, error) {
ctx := context.TODO()
resp, err := allocator.masterClient.AllocID(ctx, &masterpb.AllocIDRequest{
Base: &commonpb.MsgBase{

View File

@ -55,13 +55,11 @@ func (dp dummyPosProvider) GetDdlChannel() string {
//GetDdlChannel implements positionProvider
func (s *Server) GetDdlChannel() string {
s.getDDChannel()
return s.ddChannelMu.name
return s.ddChannelName
}
// getAllActiveVChannels get all vchannels with unflushed segments
func (s *Server) getAllActiveVChannels() []vchannel {
s.getDDChannel()
segments := s.meta.GetUnFlushedSegments()
mChanCol := make(map[string]UniqueID)
@ -81,7 +79,7 @@ func (s *Server) getAllActiveVChannels() []vchannel {
vchans = append(vchans, vchannel{
CollectionID: colID,
DmlChannel: dmChan,
DdlChannel: s.ddChannelMu.name,
DdlChannel: s.ddChannelName,
})
}
return vchans

View File

@ -5,6 +5,7 @@ import (
"fmt"
"path"
"strconv"
"sync/atomic"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/log"
@ -15,26 +16,8 @@ import (
"go.uber.org/zap"
)
func (s *Server) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
resp := &internalpb.ComponentStates{
State: &internalpb.ComponentInfo{
NodeID: Params.NodeID,
Role: role,
StateCode: s.state.Load().(internalpb.StateCode),
},
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}
// todo GetComponentStates need to be removed
//dataNodeStates, err := s.cluster.GetDataNodeStates(ctx)
//if err != nil {
//resp.Status.Reason = err.Error()
//return resp, nil
//}
//resp.SubcomponentStates = dataNodeStates
resp.Status.ErrorCode = commonpb.ErrorCode_Success
return resp, nil
func (s *Server) isClosed() bool {
return atomic.LoadInt64(&s.isServing) == 0
}
func (s *Server) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
@ -55,22 +38,17 @@ func (s *Server) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResp
}, nil
}
func (s *Server) RegisterNode(ctx context.Context, req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) {
return nil, nil
}
func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*commonpb.Status, error) {
if !s.checkStateIsHealthy() {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "server is initializing",
}, nil
resp := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
}
if s.isClosed() {
resp.Reason = "server is closed"
return resp, nil
}
if err := s.segAllocator.SealAllSegments(ctx, req.CollectionID); err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: fmt.Sprintf("Seal all segments error %s", err),
}, nil
resp.Reason = fmt.Sprintf("Seal all segments error %s", err)
return resp, nil
}
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
@ -78,7 +56,7 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*commonpb
}
func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) {
if !s.checkStateIsHealthy() {
if s.isClosed() {
return &datapb.AssignSegmentIDResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
@ -151,7 +129,7 @@ func (s *Server) ShowSegments(ctx context.Context, req *datapb.ShowSegmentsReque
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}
if !s.checkStateIsHealthy() {
if s.isClosed() {
resp.Status.Reason = "server is initializing"
return resp, nil
}
@ -167,7 +145,7 @@ func (s *Server) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentSta
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}
if !s.checkStateIsHealthy() {
if s.isClosed() {
resp.Status.Reason = "server is initializing"
return resp, nil
}
@ -284,7 +262,7 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}
if !s.checkStateIsHealthy() {
if s.isClosed() {
resp.Status.Reason = "data service is not healthy"
return resp, nil
}
@ -307,8 +285,8 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
resp := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
}
if !s.checkStateIsHealthy() {
resp.Reason = "server is initializing"
if s.isClosed() {
resp.Reason = "server is closed"
return resp, nil
}
if s.flushMsgStream == nil {
@ -340,25 +318,18 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
log.Debug("flush segment with meta", zap.Int64("id", req.SegmentID),
zap.Any("meta", meta))
// write flush msg into segmentInfo/flush stream
msgPack := composeSegmentFlushMsgPack(req.SegmentID)
err = s.flushMsgStream.Produce(&msgPack)
if err != nil {
resp.Reason = err.Error()
return resp, nil
}
log.Debug("send segment flush msg", zap.Int64("id", req.SegmentID))
// set segment to SegmentState_Flushed
if err = s.meta.FlushSegment(req.SegmentID); err != nil {
log.Error("flush segment complete failed", zap.Error(err))
resp.Reason = err.Error()
return resp, nil
}
log.Debug("flush segment complete", zap.Int64("id", req.SegmentID))
s.segAllocator.DropSegment(ctx, req.SegmentID)
s.flushCh <- req.SegmentID
resp.ErrorCode = commonpb.ErrorCode_Success
return resp, nil
}
// todo remove these rpc
func (s *Server) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
return nil, nil
}
func (s *Server) RegisterNode(ctx context.Context, req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) {
return nil, nil
}

View File

@ -375,7 +375,20 @@ func (m *meta) GetUnFlushedSegments() []*datapb.SegmentInfo {
defer m.RUnlock()
segments := make([]*datapb.SegmentInfo, 0)
for _, info := range m.segments {
if info.State != commonpb.SegmentState_Flushed {
if info.State != commonpb.SegmentState_Flushing && info.State != commonpb.SegmentState_Flushed {
cInfo := proto.Clone(info).(*datapb.SegmentInfo)
segments = append(segments, cInfo)
}
}
return segments
}
func (m *meta) GetFlushingSegments() []*datapb.SegmentInfo {
m.RLock()
defer m.RUnlock()
segments := make([]*datapb.SegmentInfo, 0)
for _, info := range m.segments {
if info.State == commonpb.SegmentState_Flushing {
cInfo := proto.Clone(info).(*datapb.SegmentInfo)
segments = append(segments, cInfo)
}

View File

@ -27,7 +27,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/schemapb"
)
func newMemoryMeta(allocator allocatorInterface) (*meta, error) {
func newMemoryMeta(allocator allocator) (*meta, error) {
memoryKV := memkv.NewMemoryKV()
return newMeta(memoryKV)
}

View File

@ -12,7 +12,6 @@ package dataservice
import (
"crypto/rand"
"fmt"
"math/big"
"github.com/milvus-io/milvus/internal/log"
@ -127,7 +126,6 @@ func newAllAssignPolicy() channelAssignPolicy {
func (p *allAssignPolicy) apply(cluster map[string]*datapb.DataNodeInfo, channel string, collectionID UniqueID) []*datapb.DataNodeInfo {
ret := make([]*datapb.DataNodeInfo, 0)
for _, node := range cluster {
fmt.Printf("xxxxnode: %v\n", node.Address)
has := false
for _, ch := range node.Channels {
if ch.Name == channel {
@ -143,7 +141,6 @@ func (p *allAssignPolicy) apply(cluster map[string]*datapb.DataNodeInfo, channel
State: datapb.ChannelWatchState_Uncomplete,
CollectionID: collectionID,
})
fmt.Printf("channelxxxx: %v\n", node.Channels)
ret = append(ret, node)
}

View File

@ -65,8 +65,7 @@ func (s *segAllocStats) loadSegmentsFromMeta() {
allocations: []*allocation{},
insertChannel: seg.InsertChannel,
lastExpireTime: seg.LastExpireTime,
sealed: seg.State == commonpb.SegmentState_Sealed ||
seg.State == commonpb.SegmentState_Flushing,
sealed: seg.State == commonpb.SegmentState_Sealed,
}
s.stats[seg.ID] = stat
}

View File

@ -41,7 +41,7 @@ func (err errRemainInSufficient) Error() string {
}
// segmentAllocator is used to allocate rows for segments and record the allocations.
type segmentAllocatorInterface interface {
type segmentAllocator interface {
// AllocSegment allocate rows and record the allocation.
AllocSegment(ctx context.Context, collectionID, partitionID UniqueID, channelName string, requestRows int64) (UniqueID, int64, Timestamp, error)
// DropSegment drop the segment from allocator.
@ -52,10 +52,10 @@ type segmentAllocatorInterface interface {
GetFlushableSegments(ctx context.Context, channel string, ts Timestamp) ([]UniqueID, error)
}
type segmentAllocator struct {
type channelSegmentAllocator struct {
mt *meta
mu sync.RWMutex
allocator allocatorInterface
allocator allocator
helper allocHelper
allocStats *segAllocStats
@ -68,13 +68,14 @@ type segmentAllocator struct {
type allocHelper struct {
afterCreateSegment func(segment *datapb.SegmentInfo) error
}
type allocOption struct {
apply func(alloc *segmentAllocator)
apply func(alloc *channelSegmentAllocator)
}
func withAllocHelper(helper allocHelper) allocOption {
return allocOption{
apply: func(alloc *segmentAllocator) { alloc.helper = helper },
apply: func(alloc *channelSegmentAllocator) { alloc.helper = helper },
}
}
@ -86,25 +87,25 @@ func defaultAllocHelper() allocHelper {
func withCalUpperLimitPolicy(policy calUpperLimitPolicy) allocOption {
return allocOption{
apply: func(alloc *segmentAllocator) { alloc.estimatePolicy = policy },
apply: func(alloc *channelSegmentAllocator) { alloc.estimatePolicy = policy },
}
}
func withAllocPolicy(policy allocatePolicy) allocOption {
return allocOption{
apply: func(alloc *segmentAllocator) { alloc.allocPolicy = policy },
apply: func(alloc *channelSegmentAllocator) { alloc.allocPolicy = policy },
}
}
func withSealPolicy(policy sealPolicy) allocOption {
return allocOption{
apply: func(alloc *segmentAllocator) { alloc.sealPolicy = policy },
apply: func(alloc *channelSegmentAllocator) { alloc.sealPolicy = policy },
}
}
func withFlushPolicy(policy flushPolicy) allocOption {
return allocOption{
apply: func(alloc *segmentAllocator) { alloc.flushPolicy = policy },
apply: func(alloc *channelSegmentAllocator) { alloc.flushPolicy = policy },
}
}
@ -124,8 +125,8 @@ func defaultFlushPolicy() flushPolicy {
return newFlushPolicyV1()
}
func newSegmentAllocator(meta *meta, allocator allocatorInterface, opts ...allocOption) *segmentAllocator {
alloc := &segmentAllocator{
func newSegmentAllocator(meta *meta, allocator allocator, opts ...allocOption) *channelSegmentAllocator {
alloc := &channelSegmentAllocator{
mt: meta,
allocator: allocator,
helper: defaultAllocHelper(),
@ -142,7 +143,7 @@ func newSegmentAllocator(meta *meta, allocator allocatorInterface, opts ...alloc
return alloc
}
func (s *segmentAllocator) AllocSegment(ctx context.Context, collectionID UniqueID,
func (s *channelSegmentAllocator) AllocSegment(ctx context.Context, collectionID UniqueID,
partitionID UniqueID, channelName string, requestRows int64) (segID UniqueID, retCount int64, expireTime Timestamp, err error) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
@ -185,7 +186,7 @@ func (s *segmentAllocator) AllocSegment(ctx context.Context, collectionID Unique
return
}
func (s *segmentAllocator) alloc(segStatus *segAllocStatus, numRows int64) (bool, error) {
func (s *channelSegmentAllocator) alloc(segStatus *segAllocStatus, numRows int64) (bool, error) {
info, err := s.mt.GetSegment(segStatus.id)
if err != nil {
return false, err
@ -206,7 +207,7 @@ func (s *segmentAllocator) alloc(segStatus *segAllocStatus, numRows int64) (bool
return true, nil
}
func (s *segmentAllocator) genExpireTs() (Timestamp, error) {
func (s *channelSegmentAllocator) genExpireTs() (Timestamp, error) {
ts, err := s.allocator.allocTimestamp()
if err != nil {
return 0, err
@ -217,7 +218,7 @@ func (s *segmentAllocator) genExpireTs() (Timestamp, error) {
return expireTs, nil
}
func (s *segmentAllocator) openNewSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string) (*segAllocStatus, error) {
func (s *channelSegmentAllocator) openNewSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string) (*segAllocStatus, error) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
id, err := s.allocator.allocID()
@ -265,7 +266,7 @@ func (s *segmentAllocator) openNewSegment(ctx context.Context, collectionID Uniq
return s.allocStats.getSegmentBy(segmentInfo.ID), nil
}
func (s *segmentAllocator) estimateTotalRows(collectionID UniqueID) (int, error) {
func (s *channelSegmentAllocator) estimateTotalRows(collectionID UniqueID) (int, error) {
collMeta, err := s.mt.GetCollection(collectionID)
if err != nil {
return -1, err
@ -273,7 +274,7 @@ func (s *segmentAllocator) estimateTotalRows(collectionID UniqueID) (int, error)
return s.estimatePolicy.apply(collMeta.Schema)
}
func (s *segmentAllocator) DropSegment(ctx context.Context, segmentID UniqueID) {
func (s *channelSegmentAllocator) DropSegment(ctx context.Context, segmentID UniqueID) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
s.mu.Lock()
@ -281,7 +282,7 @@ func (s *segmentAllocator) DropSegment(ctx context.Context, segmentID UniqueID)
s.allocStats.dropSegment(segmentID)
}
func (s *segmentAllocator) SealAllSegments(ctx context.Context, collectionID UniqueID) error {
func (s *channelSegmentAllocator) SealAllSegments(ctx context.Context, collectionID UniqueID) error {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
s.mu.Lock()
@ -290,7 +291,7 @@ func (s *segmentAllocator) SealAllSegments(ctx context.Context, collectionID Uni
return nil
}
func (s *segmentAllocator) GetFlushableSegments(ctx context.Context, channel string,
func (s *channelSegmentAllocator) GetFlushableSegments(ctx context.Context, channel string,
t Timestamp) ([]UniqueID, error) {
s.mu.Lock()
defer s.mu.Unlock()
@ -314,7 +315,7 @@ func (s *segmentAllocator) GetFlushableSegments(ctx context.Context, channel str
return ret, nil
}
func (s *segmentAllocator) tryToSealSegment() error {
func (s *channelSegmentAllocator) tryToSealSegment() error {
segments := s.allocStats.getAllSegments()
for _, segStatus := range segments {
if segStatus.sealed {
@ -335,7 +336,7 @@ func (s *segmentAllocator) tryToSealSegment() error {
return nil
}
func (s *segmentAllocator) checkSegmentSealed(segStatus *segAllocStatus) (bool, error) {
func (s *channelSegmentAllocator) checkSegmentSealed(segStatus *segAllocStatus) (bool, error) {
segMeta, err := s.mt.GetSegment(segStatus.id)
if err != nil {
return false, err
@ -346,7 +347,7 @@ func (s *segmentAllocator) checkSegmentSealed(segStatus *segAllocStatus) (bool,
}
// only for test
func (s *segmentAllocator) SealSegment(ctx context.Context, segmentID UniqueID) error {
func (s *channelSegmentAllocator) SealSegment(ctx context.Context, segmentID UniqueID) error {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
s.mu.Lock()
@ -355,16 +356,6 @@ func (s *segmentAllocator) SealSegment(ctx context.Context, segmentID UniqueID)
return nil
}
func (s *segmentAllocator) HasSegment(ctx context.Context, segmentID UniqueID) bool {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.allocStats.stats[segmentID]
return ok
}
func createNewSegmentHelper(stream msgstream.MsgStream) allocHelper {
h := allocHelper{}
h.afterCreateSegment = func(segment *datapb.SegmentInfo) error {

View File

@ -1,5 +1,4 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// Copyright (C) 2019-2020 Zilliz. All rights reserved.//// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
@ -14,19 +13,18 @@ import (
"context"
"fmt"
"math/rand"
"strconv"
"sync"
"sync/atomic"
"time"
grpcdatanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
masterclient "github.com/milvus-io/milvus/internal/distributed/masterservice/client"
"github.com/milvus-io/milvus/internal/logutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/logutil"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/retry"
@ -41,6 +39,8 @@ import (
const role = "dataservice"
const masterClientTimout = 20 * time.Second
type (
UniqueID = typeutil.UniqueID
Timestamp = typeutil.Timestamp
@ -50,32 +50,28 @@ type Server struct {
serverLoopCtx context.Context
serverLoopCancel context.CancelFunc
serverLoopWg sync.WaitGroup
state atomic.Value
initOnce sync.Once
startOnce sync.Once
stopOnce sync.Once
isServing int64
kvClient *etcdkv.EtcdKV
meta *meta
segmentInfoStream msgstream.MsgStream
segAllocator segmentAllocatorInterface
segAllocator segmentAllocator
statsHandler *statsHandler
allocator allocatorInterface
allocator allocator
cluster *cluster
masterClient types.MasterService
ddChannelMu struct {
sync.Mutex
name string
}
ddChannelName string
flushCh chan UniqueID
flushMsgStream msgstream.MsgStream
msFactory msgstream.Factory
session *sessionutil.Session
activeCh <-chan bool
watchCh <-chan *sessionutil.SessionEvent
eventCh <-chan *sessionutil.SessionEvent
dataClientCreator func(addr string) (types.DataNode, error)
dataClientCreator func(addr string) (types.DataNode, error)
masterClientCreator func(addr string) (types.MasterService, error)
}
func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
@ -83,29 +79,20 @@ func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, erro
s := &Server{
ctx: ctx,
msFactory: factory,
flushCh: make(chan UniqueID, 1024),
}
s.dataClientCreator = func(addr string) (types.DataNode, error) {
return grpcdatanodeclient.NewClient(addr, 10*time.Second)
return datanodeclient.NewClient(addr, 10*time.Second)
}
s.masterClientCreator = func(addr string) (types.MasterService, error) {
return masterclient.NewClient(addr, Params.MetaRootPath,
[]string{Params.EtcdAddress}, masterClientTimout)
}
s.UpdateStateCode(internalpb.StateCode_Abnormal)
log.Debug("DataService", zap.Any("State", s.state.Load()))
return s, nil
}
func (s *Server) getInsertChannels() []string {
channels := make([]string, 0, Params.InsertChannelNum)
var i int64 = 0
for ; i < Params.InsertChannelNum; i++ {
channels = append(channels, Params.InsertChannelPrefixName+strconv.FormatInt(i, 10))
}
return channels
}
func (s *Server) SetMasterClient(masterClient types.MasterService) {
s.masterClient = masterClient
}
// Register register data service at etcd
func (s *Server) Register() error {
s.activeCh = s.session.Init(typeutil.DataServiceRole, Params.IP, true)
@ -114,56 +101,56 @@ func (s *Server) Register() error {
}
func (s *Server) Init() error {
s.initOnce.Do(func() {
s.session = sessionutil.NewSession(s.ctx, Params.MetaRootPath, []string{Params.EtcdAddress})
})
s.session = sessionutil.NewSession(s.ctx, Params.MetaRootPath, []string{Params.EtcdAddress})
return nil
}
var startOnce sync.Once
func (s *Server) Start() error {
var err error
s.startOnce.Do(func() {
m := map[string]interface{}{
"PulsarAddress": Params.PulsarAddress,
"ReceiveBufSize": 1024,
"PulsarBufSize": 1024}
err = s.msFactory.SetParams(m)
if err != nil {
return
}
m := map[string]interface{}{
"PulsarAddress": Params.PulsarAddress,
"ReceiveBufSize": 1024,
"PulsarBufSize": 1024}
err = s.msFactory.SetParams(m)
if err != nil {
return err
}
if err = s.initMasterClient(); err != nil {
return err
}
if err = s.getDDChannelFromMaster(); err != nil {
return err
}
if err = s.initMeta(); err != nil {
return
}
if err = s.initMeta(); err != nil {
return err
}
if err = s.initCluster(); err != nil {
return
}
if err = s.initCluster(); err != nil {
return err
}
if err = s.initSegmentInfoChannel(); err != nil {
return
}
if err = s.initSegmentInfoChannel(); err != nil {
return err
}
s.allocator = newAllocator(s.masterClient)
s.allocator = newAllocator(s.masterClient)
s.startSegmentAllocator()
s.statsHandler = newStatsHandler(s.meta)
if err = s.initFlushMsgStream(); err != nil {
return
}
s.startSegmentAllocator()
s.statsHandler = newStatsHandler(s.meta)
if err = s.initFlushMsgStream(); err != nil {
return err
}
if err = s.initServiceDiscovery(); err != nil {
return
}
if err = s.initServiceDiscovery(); err != nil {
return err
}
s.startServerLoop()
s.startServerLoop()
s.UpdateStateCode(internalpb.StateCode_Healthy)
log.Debug("start success")
})
return err
atomic.StoreInt64(&s.isServing, 1)
log.Debug("start success")
return nil
}
func (s *Server) initCluster() error {
@ -198,8 +185,7 @@ func (s *Server) initServiceDiscovery() error {
return err
}
s.watchCh = s.session.WatchServices(typeutil.DataNodeRole, rev)
s.eventCh = s.session.WatchServices(typeutil.DataNodeRole, rev)
return nil
}
@ -220,14 +206,6 @@ func (s *Server) initSegmentInfoChannel() error {
return nil
}
func (s *Server) UpdateStateCode(code internalpb.StateCode) {
s.state.Store(code)
}
func (s *Server) checkStateIsHealthy() bool {
return s.state.Load().(internalpb.StateCode) == internalpb.StateCode_Healthy
}
func (s *Server) initMeta() error {
connectEtcdFn := func() error {
etcdClient, err := clientv3.New(clientv3.Config{
@ -260,26 +238,23 @@ func (s *Server) initFlushMsgStream() error {
return nil
}
func (s *Server) getDDChannel() error {
s.ddChannelMu.Lock()
defer s.ddChannelMu.Unlock()
if len(s.ddChannelMu.name) == 0 {
resp, err := s.masterClient.GetDdChannel(s.ctx)
if err = VerifyResponse(resp, err); err != nil {
return err
}
s.ddChannelMu.name = resp.Value
func (s *Server) getDDChannelFromMaster() error {
resp, err := s.masterClient.GetDdChannel(s.ctx)
if err = VerifyResponse(resp, err); err != nil {
return err
}
s.ddChannelName = resp.Value
return nil
}
func (s *Server) startServerLoop() {
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
s.serverLoopWg.Add(4)
s.serverLoopWg.Add(5)
go s.startStatsChannel(s.serverLoopCtx)
go s.startDataNodeTtLoop(s.serverLoopCtx)
go s.startWatchService(s.serverLoopCtx)
go s.startActiveCheck(s.serverLoopCtx)
go s.startFlushLoop(s.serverLoopCtx)
}
func (s *Server) startStatsChannel(ctx context.Context) {
@ -287,10 +262,10 @@ func (s *Server) startStatsChannel(ctx context.Context) {
defer s.serverLoopWg.Done()
statsStream, _ := s.msFactory.NewMsgStream(ctx)
statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataServiceSubscriptionName)
log.Debug("DataService AsConsumer: " + Params.StatisticsChannelName + " : " + Params.DataServiceSubscriptionName)
(??) log.Debug("dataservice AsConsumer: " + Params.StatisticsChannelName + " : " + Params.DataServiceSubscriptionName)
// try to restore last processed pos
pos, err := s.loadStreamLastPos(streamTypeStats)
log.Debug("load last pos of stats channel", zap.Any("pos", pos))
log.Debug("load last pos of stats channel", zap.Any("pos", pos), zap.Error(err))
if err == nil {
err = statsStream.Seek([]*internalpb.MsgPosition{pos})
if err != nil {
@ -314,13 +289,16 @@ func (s *Server) startStatsChannel(ctx context.Context) {
}
for _, msg := range msgPack.Msgs {
if msg.Type() != commonpb.MsgType_SegmentStatistics {
log.Warn("receive unknown msg from segment statistics channel", zap.Stringer("msgType", msg.Type()))
log.Warn("receive unknown msg from segment statistics channel",
zap.Stringer("msgType", msg.Type()))
continue
}
ssMsg := msg.(*msgstream.SegmentStatisticsMsg)
for _, stat := range ssMsg.SegStats {
if err := s.statsHandler.HandleSegmentStat(stat); err != nil {
log.Error("handle segment stat error", zap.Int64("segmentID", stat.SegmentID), zap.Error(err))
log.Error("handle segment stat error",
zap.Int64("segmentID", stat.SegmentID),
zap.Error(err))
continue
}
}
@ -397,13 +375,14 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) {
}
func (s *Server) startWatchService(ctx context.Context) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
for {
select {
case <-ctx.Done():
log.Debug("watch service shutdown")
return
case event := <-s.watchCh:
case event := <-s.eventCh:
datanode := &datapb.DataNodeInfo{
Address: event.Session.Address,
Version: event.Session.ServerID,
@ -423,6 +402,7 @@ func (s *Server) startWatchService(ctx context.Context) {
}
func (s *Server) startActiveCheck(ctx context.Context) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
for {
@ -441,15 +421,74 @@ func (s *Server) startActiveCheck(ctx context.Context) {
}
}
var stopOnce sync.Once
func (s *Server) startFlushLoop(ctx context.Context) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
ctx2, cancel := context.WithCancel(ctx)
defer cancel()
// send `Flushing` segments
go s.handleFlushingSegments(ctx2)
var err error
for {
select {
case <-ctx.Done():
log.Debug("flush loop shutdown")
return
case segmentID := <-s.flushCh:
// write flush msg into segmentInfo/flush stream
msgPack := composeSegmentFlushMsgPack(segmentID)
err = s.flushMsgStream.Produce(&msgPack)
if err != nil {
log.Error("produce flush msg failed",
zap.Int64("segmentID", segmentID),
zap.Error(err))
continue
}
log.Debug("send segment flush msg", zap.Int64("id", segmentID))
// set segment to SegmentState_Flushed
if err = s.meta.FlushSegment(segmentID); err != nil {
log.Error("flush segment complete failed", zap.Error(err))
continue
}
log.Debug("flush segment complete", zap.Int64("id", segmentID))
}
}
}
func (s *Server) handleFlushingSegments(ctx context.Context) {
segments := s.meta.GetFlushingSegments()
for _, segment := range segments {
select {
case <-ctx.Done():
return
case s.flushCh <- segment.ID:
}
}
}
func (s *Server) initMasterClient() error {
var err error
s.masterClient, err = s.masterClientCreator("")
if err != nil {
return err
}
if err = s.masterClient.Init(); err != nil {
return err
}
return s.masterClient.Start()
}
func (s *Server) Stop() error {
s.stopOnce.Do(func() {
s.cluster.releaseSessions()
s.segmentInfoStream.Close()
s.flushMsgStream.Close()
s.stopServerLoop()
})
if !atomic.CompareAndSwapInt64(&s.isServing, 1, 0) {
return nil
}
log.Debug("dataservice server shutdown")
atomic.StoreInt64(&s.isServing, 0)
s.cluster.releaseSessions()
s.segmentInfoStream.Close()
s.flushMsgStream.Close()
s.stopServerLoop()
return nil
}

View File

@ -632,7 +632,6 @@ func TestDataNodeTtChannel(t *testing.T) {
assert.EqualValues(t, 1, len(resp.SegIDAssignments))
assign := resp.SegIDAssignments[0]
log.Debug("xxxxxxxxxxxxx", zap.Any("assign", assign))
resp2, err := svr.Flush(context.TODO(), &datapb.FlushRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Flush,
@ -751,6 +750,7 @@ func TestResumeChannel(t *testing.T) {
log.Debug("check segment in meta", zap.Any("id", seg.ID), zap.Any("has", has))
assert.True(t, has)
if has {
log.Debug("compare num rows", zap.Any("id", seg.ID), zap.Any("expected", segRows), zap.Any("actual", seg.NumRows))
assert.Equal(t, segRows, seg.NumRows)
}
}
@ -787,16 +787,12 @@ func newTestServer(t *testing.T, receiveCh chan interface{}) *Server {
svr, err := CreateServer(context.TODO(), factory)
assert.Nil(t, err)
ms := newMockMasterService()
err = ms.Init()
assert.Nil(t, err)
err = ms.Start()
assert.Nil(t, err)
defer ms.Stop()
svr.SetMasterClient(ms)
svr.dataClientCreator = func(addr string) (types.DataNode, error) {
return newMockDataNodeClient(0, receiveCh)
}
svr.masterClientCreator = func(addr string) (types.MasterService, error) {
return newMockMasterService(), nil
}
assert.Nil(t, err)
err = svr.Init()
assert.Nil(t, err)

View File

@ -104,9 +104,6 @@ func (s *Server) init() error {
return err
}
s.dataService.UpdateStateCode(internalpb.StateCode_Initializing)
log.Debug("DataService", zap.Any("State", internalpb.StateCode_Initializing))
if s.newMasterServiceClient != nil {
log.Debug("DataService try to new master service client", zap.String("address", Params.MasterAddress))
masterServiceClient, err := s.newMasterServiceClient(Params.MasterAddress)
@ -129,7 +126,6 @@ func (s *Server) init() error {
panic(err)
}
log.Debug("DataService report MasterService is ready")
s.dataService.SetMasterClient(masterServiceClient)
}
if err := s.dataService.Init(); err != nil {

View File

@ -85,7 +85,7 @@ func (c *GrpcClient) reconnect() error {
addr, err := getMasterServiceAddr(c.sess)
if err != nil {
log.Debug("MasterServiceClient getMasterServiceAddr failed", zap.Error(err))
return nil
return err
}
log.Debug("MasterServiceClient getMasterServiceAddr success")
tracer := opentracing.GlobalTracer()

View File

@ -49,9 +49,9 @@ type DataService interface {
Component
TimeTickProvider
RegisterNode(ctx context.Context, req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error)
Flush(ctx context.Context, req *datapb.FlushRequest) (*commonpb.Status, error)
RegisterNode(ctx context.Context, req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error)
AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error)
ShowSegments(ctx context.Context, req *datapb.ShowSegmentsRequest) (*datapb.ShowSegmentsResponse, error)
GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error)