Send timetick to insert channel

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
pull/4973/head^2
godchen 2021-02-03 20:04:29 +08:00 committed by yefu.chen
parent e7588d3250
commit 6e7e0b748a
12 changed files with 92 additions and 163 deletions

View File

@ -50,7 +50,6 @@ type DataService interface {
GetPartitionStatistics(req *datapb.PartitionStatsRequest) (*datapb.PartitionStatsResponse, error)
GetComponentStates() (*internalpb2.ComponentStates, error)
GetCount(req *datapb.CollectionCountRequest) (*datapb.CollectionCountResponse, error)
GetSegmentInfo(req *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error)
}
type MasterClient interface {
@ -729,10 +728,6 @@ func (s *Server) GetCount(req *datapb.CollectionCountRequest) (*datapb.Collectio
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
},
}
if !s.checkStateIsHealthy() {
resp.Status.Reason = "data service is not healthy"
return resp, nil
}
nums, err := s.meta.GetNumRowsOfCollection(req.CollectionID)
if err != nil {
resp.Status.Reason = err.Error()
@ -742,27 +737,3 @@ func (s *Server) GetCount(req *datapb.CollectionCountRequest) (*datapb.Collectio
resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
return resp, nil
}
func (s *Server) GetSegmentInfo(req *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error) {
resp := &datapb.SegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
},
}
if !s.checkStateIsHealthy() {
resp.Status.Reason = "data service is not healthy"
return resp, nil
}
infos := make([]*datapb.SegmentInfo, len(req.SegmentIDs))
for i, id := range req.SegmentIDs {
segmentInfo, err := s.meta.GetSegment(id)
if err != nil {
resp.Status.Reason = err.Error()
return resp, nil
}
infos[i] = segmentInfo
}
resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
resp.Infos = infos
return resp, nil
}

View File

@ -137,7 +137,3 @@ func (c *Client) GetSegmentInfoChannel() (string, error) {
func (c *Client) GetCount(req *datapb.CollectionCountRequest) (*datapb.CollectionCountResponse, error) {
return c.grpcClient.GetCount(context.Background(), req)
}
func (c *Client) GetSegmentInfo(req *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error) {
return c.grpcClient.GetSegmentInfo(context.Background(), req)
}

View File

@ -25,7 +25,7 @@ type Service struct {
}
func (s *Service) GetSegmentInfo(ctx context.Context, request *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error) {
return s.server.GetSegmentInfo(request)
panic("implement me")
}
func NewGrpcService(ctx context.Context) *Service {

View File

@ -322,5 +322,5 @@ func (s *Server) GetDdChannel(ctx context.Context, request *commonpb.Empty) (*mi
}
func (s *Server) GetPersistentSegmentInfo(ctx context.Context, request *milvuspb.PersistentSegmentInfoRequest) (*milvuspb.PersistentSegmentInfoResponse, error) {
return s.impl.GetPersistentSegmentInfo(request)
panic("implement me")
}

View File

@ -249,12 +249,28 @@ func TestMasterService(t *testing.T) {
msg, ok := <-ddStream.Chan()
assert.True(t, ok)
assert.Equal(t, len(msg.Msgs), 1)
assert.True(t, len(msg.Msgs) == 2 || len(msg.Msgs) == 1)
createMsg, ok := (msg.Msgs[0]).(*ms.CreateCollectionMsg)
assert.True(t, ok)
createMeta, err := core.MetaTable.GetCollectionByName("testColl")
assert.Nil(t, err)
assert.Equal(t, createMsg.CollectionID, createMeta.ID)
assert.Equal(t, len(createMeta.PartitionIDs), 1)
if len(msg.Msgs) == 2 {
createPart, ok := (msg.Msgs[1]).(*ms.CreatePartitionMsg)
assert.True(t, ok)
assert.Equal(t, createPart.CollectionName, "testColl")
assert.Equal(t, createPart.PartitionID, createMeta.PartitionIDs[0])
} else {
msg, ok = <-ddStream.Chan()
assert.True(t, ok)
createPart, ok := (msg.Msgs[0]).(*ms.CreatePartitionMsg)
assert.True(t, ok)
assert.Equal(t, createPart.CollectionName, "testColl")
assert.Equal(t, createPart.PartitionID, createMeta.PartitionIDs[0])
}
req.Base.MsgID = 101
req.Base.Timestamp = 101

View File

@ -147,6 +147,26 @@ func (t *CreateCollectionReqTask) Execute() error {
return err
}
ddPart := internalpb2.CreatePartitionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kCreatePartition,
MsgID: t.Req.Base.MsgID, //TODO, msg id
Timestamp: t.Req.Base.Timestamp + 1,
SourceID: t.Req.Base.SourceID,
},
DbName: t.Req.DbName,
CollectionName: t.Req.CollectionName,
PartitionName: Params.DefaultPartitionName,
DbID: 0, //TODO, not used
CollectionID: collMeta.ID,
PartitionID: partMeta.PartitionID,
}
err = t.core.DdCreatePartitionReq(&ddPart)
if err != nil {
return err
}
return nil
}

View File

@ -51,6 +51,7 @@ func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *ProtoUnmarshalDispatcher {
segmentInfoMsg := SegmentInfoMsg{}
flushCompletedMsg := FlushCompletedMsg{}
queryNodeSegStatsMsg := QueryNodeStatsMsg{}
segmentStatisticsMsg := SegmentStatisticsMsg{}
p := &ProtoUnmarshalDispatcher{}
p.TempMap = make(map[commonpb.MsgType]UnmarshalFunc)
@ -68,6 +69,7 @@ func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *ProtoUnmarshalDispatcher {
p.TempMap[commonpb.MsgType_kFlush] = flushMsg.Unmarshal
p.TempMap[commonpb.MsgType_kSegmentInfo] = segmentInfoMsg.Unmarshal
p.TempMap[commonpb.MsgType_kSegmentFlushDone] = flushCompletedMsg.Unmarshal
p.TempMap[commonpb.MsgType_kSegmentStatistics] = segmentStatisticsMsg.Unmarshal
return p
}

View File

@ -2,13 +2,10 @@ package proxynode
import (
"context"
"errors"
"log"
"strconv"
"time"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
@ -593,109 +590,3 @@ func (node *NodeImpl) Flush(request *milvuspb.FlushRequest) (*commonpb.Status, e
func (node *NodeImpl) GetDdChannel(request *commonpb.Empty) (*milvuspb.StringResponse, error) {
panic("implement me")
}
func (node *NodeImpl) GetPersistentSegmentInfo(req *milvuspb.PersistentSegmentInfoRequest) (*milvuspb.PersistentSegmentInfoResponse, error) {
resp := &milvuspb.PersistentSegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
},
}
segments, err := node.getSegmentsOfCollection(req.DbName, req.CollectionName)
if err != nil {
resp.Status.Reason = err.Error()
return resp, nil
}
infoResp, err := node.dataServiceClient.GetSegmentInfo(&datapb.SegmentInfoRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kSegmentInfo,
MsgID: 0,
Timestamp: 0,
SourceID: Params.ProxyID,
},
SegmentIDs: segments,
})
if err != nil {
resp.Status.Reason = err.Error()
return resp, nil
}
if infoResp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
resp.Status.Reason = infoResp.Status.Reason
return resp, nil
}
persistentInfos := make([]*milvuspb.PersistentSegmentInfo, len(infoResp.Infos))
for i, info := range infoResp.Infos {
persistentInfos[i] = &milvuspb.PersistentSegmentInfo{
SegmentID: info.SegmentID,
CollectionID: info.CollectionID,
PartitionID: info.PartitionID,
OpenTime: info.OpenTime,
SealedTime: info.SealedTime,
FlushedTime: info.FlushedTime,
NumRows: info.NumRows,
MemSize: info.MemSize,
State: info.State,
}
}
resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
resp.Infos = persistentInfos
return resp, nil
}
func (node *NodeImpl) getSegmentsOfCollection(dbName string, collectionName string) ([]UniqueID, error) {
describeCollectionResponse, err := node.masterClient.DescribeCollection(&milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kDescribeCollection,
MsgID: 0,
Timestamp: 0,
SourceID: Params.ProxyID,
},
DbName: dbName,
CollectionName: collectionName,
})
if err != nil {
return nil, err
}
if describeCollectionResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return nil, errors.New(describeCollectionResponse.Status.Reason)
}
collectionID := describeCollectionResponse.CollectionID
showPartitionsResp, err := node.masterClient.ShowPartitions(&milvuspb.ShowPartitionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kShowPartitions,
MsgID: 0,
Timestamp: 0,
SourceID: Params.ProxyID,
},
DbName: dbName,
CollectionName: collectionName,
CollectionID: collectionID,
})
if err != nil {
return nil, err
}
if showPartitionsResp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return nil, errors.New(showPartitionsResp.Status.Reason)
}
ret := make([]UniqueID, 0)
for _, partitionID := range showPartitionsResp.PartitionIDs {
showSegmentResponse, err := node.masterClient.ShowSegments(&milvuspb.ShowSegmentRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kShowSegment,
MsgID: 0,
Timestamp: 0,
SourceID: Params.ProxyID,
},
CollectionID: collectionID,
PartitionID: partitionID,
})
if err != nil {
return nil, err
}
if showSegmentResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return nil, errors.New(showSegmentResponse.Status.Reason)
}
ret = append(ret, showSegmentResponse.SegmentIDs...)
}
return ret, nil
}

View File

@ -22,7 +22,6 @@ type MasterClient interface {
ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error)
CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, error)
DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error)
ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error)
}
type IndexServiceClient interface {
@ -52,7 +51,6 @@ type DataServiceClient interface {
GetCollectionStatistics(req *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error)
GetComponentStates() (*internalpb2.ComponentStates, error)
GetSegmentInfo(req *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error)
}
type ProxyServiceClient interface {

View File

@ -7,6 +7,7 @@ import (
"os"
"path"
"runtime"
"strconv"
"time"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
@ -108,6 +109,16 @@ func (s *ServiceImpl) Init() error {
serviceTimeTickMsgStream.CreatePulsarProducers([]string{Params.ServiceTimeTickChannel})
log.Println("create service time tick producer channel: ", []string{Params.ServiceTimeTickChannel})
channels := make([]string, Params.InsertChannelNum)
var i int64 = 0
for ; i < Params.InsertChannelNum; i++ {
channels[i] = Params.InsertChannelPrefixName + strconv.FormatInt(i, 10)
}
insertTickMsgStream := pulsarms.NewPulsarMsgStream(s.ctx, 1024, 1024, dispatcherFactory.NewUnmarshalDispatcher())
insertTickMsgStream.SetPulsarClient(Params.PulsarAddress)
insertTickMsgStream.CreatePulsarProducers(channels)
log.Println("create service time tick producer channel: ", channels)
nodeTimeTickMsgStream := pulsarms.NewPulsarMsgStream(s.ctx, 1024, 1024, dispatcherFactory.NewUnmarshalDispatcher())
nodeTimeTickMsgStream.SetPulsarClient(Params.PulsarAddress)
nodeTimeTickMsgStream.CreatePulsarConsumers(Params.NodeTimeTickChannel,
@ -116,7 +127,7 @@ func (s *ServiceImpl) Init() error {
ttBarrier := newSoftTimeTickBarrier(s.ctx, nodeTimeTickMsgStream, []UniqueID{0}, 10)
log.Println("create soft time tick barrier ...")
s.tick = newTimeTick(s.ctx, ttBarrier, serviceTimeTickMsgStream)
s.tick = newTimeTick(s.ctx, ttBarrier, serviceTimeTickMsgStream, insertTickMsgStream)
log.Println("create time tick ...")
s.stateCode = internalpb2.StateCode_HEALTHY

View File

@ -9,11 +9,13 @@ import (
type ParamTable struct {
paramtable.BaseTable
PulsarAddress string
MasterAddress string
NodeTimeTickChannel []string
ServiceTimeTickChannel string
DataServiceAddress string
PulsarAddress string
MasterAddress string
NodeTimeTickChannel []string
ServiceTimeTickChannel string
DataServiceAddress string
InsertChannelPrefixName string
InsertChannelNum int64
}
var Params ParamTable
@ -21,11 +23,17 @@ var Params ParamTable
func (pt *ParamTable) Init() {
pt.BaseTable.Init()
if err := pt.LoadYaml("advanced/data_service.yaml"); err != nil {
panic(err)
}
pt.initPulsarAddress()
pt.initMasterAddress()
pt.initNodeTimeTickChannel()
pt.initServiceTimeTickChannel()
pt.initDataServiceAddress()
pt.initInsertChannelPrefixName()
pt.initInsertChannelNum()
}
func (pt *ParamTable) initPulsarAddress() {
@ -65,3 +73,15 @@ func (pt *ParamTable) initDataServiceAddress() {
// NOT USED NOW
pt.DataServiceAddress = "TODO: read from config"
}
func (pt *ParamTable) initInsertChannelNum() {
pt.InsertChannelNum = pt.ParseInt64("dataservice.insertChannelNum")
}
func (pt *ParamTable) initInsertChannelPrefixName() {
var err error
pt.InsertChannelPrefixName, err = pt.Load("msgChannel.chanNamePrefix.dataServiceInsertChannel")
if err != nil {
panic(err)
}
}

View File

@ -20,7 +20,7 @@ type (
TimeTickImpl struct {
ttBarrier TimeTickBarrier
channel msgstream.MsgStream
channels []msgstream.MsgStream
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
@ -58,17 +58,19 @@ func (tt *TimeTickImpl) Start() error {
},
}
msgPack.Msgs = append(msgPack.Msgs, timeTickMsg)
err = tt.channel.Produce(&msgPack)
if err != nil {
log.Println("send time tick error: ", err)
} else {
for _, channel := range tt.channels {
err = channel.Broadcast(&msgPack)
if err != nil {
log.Println("send time tick error: ", err)
}
}
log.Println("send to master: ", current)
}
}
}()
tt.channel.Start()
for _, channel := range tt.channels {
channel.Start()
}
err := tt.ttBarrier.Start()
if err != nil {
@ -79,13 +81,15 @@ func (tt *TimeTickImpl) Start() error {
}
func (tt *TimeTickImpl) Close() {
tt.channel.Close()
for _, channel := range tt.channels {
channel.Close()
}
tt.ttBarrier.Close()
tt.cancel()
tt.wg.Wait()
}
func newTimeTick(ctx context.Context, ttBarrier TimeTickBarrier, channel msgstream.MsgStream) TimeTick {
func newTimeTick(ctx context.Context, ttBarrier TimeTickBarrier, channels ...msgstream.MsgStream) TimeTick {
ctx1, cancel := context.WithCancel(ctx)
return &TimeTickImpl{ctx: ctx1, cancel: cancel, ttBarrier: ttBarrier, channel: channel}
return &TimeTickImpl{ctx: ctx1, cancel: cancel, ttBarrier: ttBarrier, channels: channels}
}