Refactor data service

Signed-off-by: neza2017 <yefu.chen@zilliz.com>
pull/4973/head^2
neza2017 2021-01-25 15:17:17 +08:00 committed by yefu.chen
parent e89e89b894
commit 1fe099775d
21 changed files with 444 additions and 561 deletions

View File

@ -1,93 +0,0 @@
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
ds "github.com/zilliztech/milvus-distributed/internal/dataservice"
dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
isc "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client"
msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
psc "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice"
is "github.com/zilliztech/milvus-distributed/internal/indexservice"
ms "github.com/zilliztech/milvus-distributed/internal/masterservice"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
)
const reTryCnt = 3
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
log.Printf("master service address : %s:%d", ms.Params.Address, ms.Params.Port)
svr, err := msc.NewGrpcServer(ctx)
if err != nil {
panic(err)
}
log.Printf("proxy service address : %s", psc.Params.NetworkAddress())
//proxyService := psc.NewClient(ctx, psc.Params.NetworkAddress())
//TODO, test proxy service GetComponentStates, before set
//if err = svr.SetProxyService(proxyService); err != nil {
// panic(err)
//}
log.Printf("data service address : %s:%d", ds.Params.Address, ds.Params.Port)
dataService := dsc.NewClient(fmt.Sprintf("%s:%d", ds.Params.Address, ds.Params.Port))
if err = dataService.Init(); err != nil {
panic(err)
}
if err = dataService.Start(); err != nil {
panic(err)
}
cnt := 0
for cnt = 0; cnt < reTryCnt; cnt++ {
dsStates, err := dataService.GetComponentStates()
if err != nil {
continue
}
if dsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
continue
}
if dsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && dsStates.State.StateCode != internalpb2.StateCode_HEALTHY {
continue
}
break
}
if cnt >= reTryCnt {
panic("connect to data service failed")
}
//if err = svr.SetDataService(dataService); err != nil {
// panic(err)
//}
log.Printf("index service address : %s", is.Params.Address)
indexService := isc.NewClient(is.Params.Address)
if err = svr.SetIndexService(indexService); err != nil {
panic(err)
}
if err = svr.Start(); err != nil {
panic(err)
}
sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)
sig := <-sc
log.Printf("Got %s signal to exit", sig.String())
_ = svr.Stop()
}

View File

@ -23,5 +23,4 @@ master:
IDAssignExpiration: 2000 # ms
maxPartitionNum: 4096
nodeID: 100
timeout: 5 # time out, 5 seconds
nodeID: 100

View File

@ -29,7 +29,6 @@ type (
client kv.TxnBase // client of a reliable kv service, i.e. etcd client
collID2Info map[UniqueID]*collectionInfo // collection id to collection info
segID2Info map[UniqueID]*datapb.SegmentInfo // segment id to segment info
allocator allocator
ddLock sync.RWMutex
}
)
@ -50,12 +49,11 @@ func (err errCollectionNotFound) Error() string {
return fmt.Sprintf("collection %d not found", err.collectionID)
}
func newMeta(kv kv.TxnBase, allocator allocator) (*meta, error) {
func newMeta(kv kv.TxnBase) (*meta, error) {
mt := &meta{
client: kv,
collID2Info: make(map[UniqueID]*collectionInfo),
segID2Info: make(map[UniqueID]*datapb.SegmentInfo),
allocator: allocator,
}
err := mt.reloadFromKV()
if err != nil {
@ -120,29 +118,6 @@ func (meta *meta) GetCollection(collectionID UniqueID) (*collectionInfo, error)
return collectionInfo, nil
}
func (meta *meta) BuildSegment(collectionID UniqueID, partitionID UniqueID, channelRange []string) (*datapb.SegmentInfo, error) {
id, err := meta.allocator.allocID()
if err != nil {
return nil, err
}
ts, err := meta.allocator.allocTimestamp()
if err != nil {
return nil, err
}
return &datapb.SegmentInfo{
SegmentID: id,
CollectionID: collectionID,
PartitionID: partitionID,
InsertChannels: channelRange,
OpenTime: ts,
SealedTime: 0,
NumRows: 0,
MemSize: 0,
State: datapb.SegmentState_SegmentGrowing,
}, nil
}
func (meta *meta) AddSegment(segmentInfo *datapb.SegmentInfo) error {
meta.ddLock.Lock()
defer meta.ddLock.Unlock()
@ -191,23 +166,18 @@ func (meta *meta) GetSegment(segID UniqueID) (*datapb.SegmentInfo, error) {
return segmentInfo, nil
}
func (meta *meta) SealSegment(segID UniqueID) error {
func (meta *meta) OpenSegment(segmentID UniqueID, timetick Timestamp) error {
meta.ddLock.Lock()
defer meta.ddLock.Unlock()
segInfo, ok := meta.segID2Info[segID]
segInfo, ok := meta.segID2Info[segmentID]
if !ok {
return newErrSegmentNotFound(segID)
return newErrSegmentNotFound(segmentID)
}
ts, err := meta.allocator.allocTimestamp()
if err != nil {
return err
}
segInfo.SealedTime = ts
segInfo.State = datapb.SegmentState_SegmentSealed
segInfo.OpenTime = timetick
err = meta.saveSegmentInfo(segInfo)
err := meta.saveSegmentInfo(segInfo)
if err != nil {
_ = meta.reloadFromKV()
return err
@ -215,7 +185,7 @@ func (meta *meta) SealSegment(segID UniqueID) error {
return nil
}
func (meta *meta) FlushSegment(segID UniqueID) error {
func (meta *meta) SealSegment(segID UniqueID, timetick Timestamp) error {
meta.ddLock.Lock()
defer meta.ddLock.Unlock()
@ -224,14 +194,45 @@ func (meta *meta) FlushSegment(segID UniqueID) error {
return newErrSegmentNotFound(segID)
}
ts, err := meta.allocator.allocTimestamp()
segInfo.SealedTime = timetick
err := meta.saveSegmentInfo(segInfo)
if err != nil {
_ = meta.reloadFromKV()
return err
}
segInfo.FlushedTime = ts
segInfo.State = datapb.SegmentState_SegmentFlushed
return nil
}
err = meta.saveSegmentInfo(segInfo)
func (meta *meta) FlushSegment(segID UniqueID, timetick Timestamp) error {
meta.ddLock.Lock()
defer meta.ddLock.Unlock()
segInfo, ok := meta.segID2Info[segID]
if !ok {
return newErrSegmentNotFound(segID)
}
segInfo.FlushedTime = timetick
err := meta.saveSegmentInfo(segInfo)
if err != nil {
_ = meta.reloadFromKV()
return err
}
return nil
}
func (meta *meta) SetSegmentState(segmentID UniqueID, state datapb.SegmentState) error {
meta.ddLock.Lock()
defer meta.ddLock.Unlock()
segInfo, ok := meta.segID2Info[segmentID]
if !ok {
return newErrSegmentNotFound(segmentID)
}
segInfo.State = state
err := meta.saveSegmentInfo(segInfo)
if err != nil {
_ = meta.reloadFromKV()
return err
@ -316,3 +317,16 @@ func (meta *meta) saveSegmentInfo(segmentInfo *datapb.SegmentInfo) error {
func (meta *meta) removeSegmentInfo(segID UniqueID) error {
return meta.client.Remove("/segment/" + strconv.FormatInt(segID, 10))
}
func BuildSegment(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, channelRange []string) (*datapb.SegmentInfo, error) {
return &datapb.SegmentInfo{
SegmentID: segmentID,
CollectionID: collectionID,
PartitionID: partitionID,
InsertChannels: channelRange,
OpenTime: 0,
SealedTime: 0,
NumRows: 0,
MemSize: 0,
State: datapb.SegmentState_SegmentGrowing,
}, nil
}

View File

@ -46,7 +46,9 @@ func TestSegment(t *testing.T) {
assert.Nil(t, err)
id, err := mockAllocator.allocID()
assert.Nil(t, err)
segmentInfo, err := meta.BuildSegment(id, 100, []string{"c1", "c2"})
segID, err := mockAllocator.allocID()
assert.Nil(t, err)
segmentInfo, err := BuildSegment(id, 100, segID, []string{"c1", "c2"})
assert.Nil(t, err)
err = meta.AddSegment(segmentInfo)
assert.Nil(t, err)
@ -59,9 +61,9 @@ func TestSegment(t *testing.T) {
ids = meta.GetSegmentsByCollectionAndPartitionID(id, 100)
assert.EqualValues(t, 1, len(ids))
assert.EqualValues(t, segmentInfo.SegmentID, ids[0])
err = meta.SealSegment(segmentInfo.SegmentID)
err = meta.SealSegment(segmentInfo.SegmentID, 100)
assert.Nil(t, err)
err = meta.FlushSegment(segmentInfo.SegmentID)
err = meta.FlushSegment(segmentInfo.SegmentID, 200)
assert.Nil(t, err)
info, err = meta.GetSegment(segmentInfo.SegmentID)
assert.Nil(t, err)

View File

@ -12,7 +12,7 @@ import (
func newMemoryMeta(allocator allocator) (*meta, error) {
memoryKV := memkv.NewMemoryKV()
return newMeta(memoryKV, allocator)
return newMeta(memoryKV)
}
type MockAllocator struct {

View File

@ -29,7 +29,8 @@ type ParamTable struct {
StatisticsChannelName string
TimeTickChannelName string
DataNodeNum int
SegmentChannelName string // todo init
SegmentInfoChannelName string
DataServiceSubscriptionName string
}
var Params ParamTable
@ -62,6 +63,8 @@ func (p *ParamTable) Init() {
p.initStatisticsChannelName()
p.initTimeTickChannelName()
p.initDataNodeNum()
p.initSegmentInfoChannelName()
p.initDataServiceSubscriptionName()
}
func (p *ParamTable) initAddress() {
@ -151,3 +154,11 @@ func (p *ParamTable) initTimeTickChannelName() {
func (p *ParamTable) initDataNodeNum() {
p.DataNodeNum = 2
}
func (p *ParamTable) initSegmentInfoChannelName() {
p.SegmentInfoChannelName = "segment-info-channel"
}
func (p *ParamTable) initDataServiceSubscriptionName() {
p.DataServiceSubscriptionName = "dataserive-sub"
}

View File

@ -2,7 +2,6 @@ package dataservice
import (
"fmt"
"log"
"strconv"
"sync"
"time"
@ -41,7 +40,7 @@ type segmentAllocator interface {
// ExpireAllocations check all allocations' expire time and remove the expired allocation.
ExpireAllocations(timeTick Timestamp) error
// SealAllSegments get all opened segment ids of collection. return success and failed segment ids
SealAllSegments(collectionID UniqueID) (bool, []UniqueID)
SealAllSegments(collectionID UniqueID)
// IsAllocationsExpired check all allocations of segment expired.
IsAllocationsExpired(segmentID UniqueID, ts Timestamp) (bool, error)
}
@ -208,9 +207,6 @@ func (allocator *segmentAllocatorImpl) SealSegment(segmentID UniqueID) error {
if !ok {
return nil
}
if err := allocator.mt.SealSegment(segmentID); err != nil {
return err
}
status.sealed = true
return nil
}
@ -246,23 +242,15 @@ func (allocator *segmentAllocatorImpl) IsAllocationsExpired(segmentID UniqueID,
return status.lastExpireTime <= ts, nil
}
func (allocator *segmentAllocatorImpl) SealAllSegments(collectionID UniqueID) (bool, []UniqueID) {
func (allocator *segmentAllocatorImpl) SealAllSegments(collectionID UniqueID) {
allocator.mu.Lock()
defer allocator.mu.Unlock()
failed := make([]UniqueID, 0)
success := true
for _, status := range allocator.segments {
if status.collectionID == collectionID {
if status.sealed {
continue
}
if err := allocator.mt.SealSegment(status.id); err != nil {
log.Printf("seal segment error: %s", err.Error())
failed = append(failed, status.id)
success = false
}
status.sealed = true
}
}
return success, failed
}

View File

@ -25,7 +25,9 @@ func TestAllocSegment(t *testing.T) {
Schema: schema,
})
assert.Nil(t, err)
segmentInfo, err := meta.BuildSegment(collID, 100, []string{"c1", "c2"})
id, err := mockAllocator.allocID()
assert.Nil(t, err)
segmentInfo, err := BuildSegment(collID, 100, id, []string{"c1", "c2"})
assert.Nil(t, err)
err = meta.AddSegment(segmentInfo)
assert.Nil(t, err)
@ -76,7 +78,9 @@ func TestSealSegment(t *testing.T) {
assert.Nil(t, err)
var lastSegID UniqueID
for i := 0; i < 10; i++ {
segmentInfo, err := meta.BuildSegment(collID, 100, []string{"c" + strconv.Itoa(i)})
id, err := mockAllocator.allocID()
assert.Nil(t, err)
segmentInfo, err := BuildSegment(collID, 100, id, []string{"c" + strconv.Itoa(i)})
assert.Nil(t, err)
err = meta.AddSegment(segmentInfo)
assert.Nil(t, err)
@ -87,9 +91,7 @@ func TestSealSegment(t *testing.T) {
err = segAllocator.SealSegment(lastSegID)
assert.Nil(t, err)
success, ids := segAllocator.SealAllSegments(collID)
assert.True(t, success)
assert.EqualValues(t, 0, len(ids))
segAllocator.SealAllSegments(collID)
sealedSegments, err := segAllocator.GetSealedSegments()
assert.Nil(t, err)
assert.EqualValues(t, 10, len(sealedSegments))
@ -111,7 +113,9 @@ func TestExpireSegment(t *testing.T) {
Schema: schema,
})
assert.Nil(t, err)
segmentInfo, err := meta.BuildSegment(collID, 100, []string{"c1", "c2"})
id, err := mockAllocator.allocID()
assert.Nil(t, err)
segmentInfo, err := BuildSegment(collID, 100, id, []string{"c1", "c2"})
assert.Nil(t, err)
err = meta.AddSegment(segmentInfo)
assert.Nil(t, err)

View File

@ -6,6 +6,8 @@ import (
"log"
"sync"
"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
@ -28,6 +30,7 @@ const role = "dataservice"
type DataService interface {
typeutil.Service
typeutil.Component
RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error)
Flush(req *datapb.FlushRequest) (*commonpb.Status, error)
@ -35,36 +38,35 @@ type DataService interface {
ShowSegments(req *datapb.ShowSegmentRequest) (*datapb.ShowSegmentResponse, error)
GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error)
GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*datapb.InsertBinlogPathsResponse, error)
GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error)
GetSegmentInfoChannel() (string, error)
GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error)
GetCollectionStatistics(req *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error)
GetPartitionStatistics(req *datapb.PartitionStatsRequest) (*datapb.PartitionStatsResponse, error)
GetComponentStates() (*internalpb2.ComponentStates, error)
GetTimeTickChannel() (*milvuspb.StringResponse, error)
GetStatisticsChannel() (*milvuspb.StringResponse, error)
}
type (
UniqueID = typeutil.UniqueID
Timestamp = typeutil.Timestamp
Server struct {
ctx context.Context
serverLoopCtx context.Context
serverLoopCancel context.CancelFunc
serverLoopWg sync.WaitGroup
state internalpb2.StateCode
client *etcdkv.EtcdKV
meta *meta
segAllocator segmentAllocator
statsHandler *statsHandler
insertChannelMgr *insertChannelManager
allocator allocator
cluster *dataNodeCluster
msgProducer *timesync.MsgProducer
registerFinishCh chan struct{}
masterClient *masterservice.GrpcClient
ttMsgStream msgstream.MsgStream
ddChannelName string
ctx context.Context
serverLoopCtx context.Context
serverLoopCancel context.CancelFunc
serverLoopWg sync.WaitGroup
state internalpb2.StateCode
client *etcdkv.EtcdKV
meta *meta
segAllocator segmentAllocator
statsHandler *statsHandler
insertChannelMgr *insertChannelManager
allocator allocator
cluster *dataNodeCluster
msgProducer *timesync.MsgProducer
registerFinishCh chan struct{}
masterClient *masterservice.GrpcClient
ttMsgStream msgstream.MsgStream
ddChannelName string
segmentInfoStream msgstream.MsgStream
}
)
@ -105,6 +107,7 @@ func (s *Server) Start() error {
return err
}
s.initSegmentInfoChannel()
s.startServerLoop()
s.state = internalpb2.StateCode_HEALTHY
log.Println("start success")
@ -118,7 +121,7 @@ func (s *Server) initMeta() error {
}
etcdKV := etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
s.client = etcdKV
s.meta, err = newMeta(etcdKV, s.allocator)
s.meta, err = newMeta(etcdKV)
if err != nil {
return err
}
@ -132,7 +135,10 @@ func (s *Server) waitDataNodeRegister() {
}
func (s *Server) initMsgProducer() error {
s.ttMsgStream = pulsarms.NewPulsarTtMsgStream(s.ctx, 1024)
ttMsgStream := pulsarms.NewPulsarTtMsgStream(s.ctx, 1024)
ttMsgStream.SetPulsarClient(Params.PulsarAddress)
ttMsgStream.CreatePulsarConsumers([]string{Params.TimeTickChannelName}, Params.DataServiceSubscriptionName, util.NewUnmarshalDispatcher(), 1024)
s.ttMsgStream = ttMsgStream
s.ttMsgStream.Start()
timeTickBarrier := timesync.NewHardTimeTickBarrier(s.ttMsgStream, s.cluster.GetNodeIDs())
dataNodeTTWatcher := newDataNodeTimeTickWatcher(s.meta, s.segAllocator, s.cluster)
@ -154,6 +160,8 @@ func (s *Server) startServerLoop() {
func (s *Server) startStatsChannel(ctx context.Context) {
defer s.serverLoopWg.Done()
statsStream := pulsarms.NewPulsarMsgStream(ctx, 1024)
statsStream.SetPulsarClient(Params.PulsarAddress)
statsStream.CreatePulsarConsumers([]string{Params.StatisticsChannelName}, Params.DataServiceSubscriptionName, util.NewUnmarshalDispatcher(), 1024)
statsStream.Start()
defer statsStream.Close()
for {
@ -175,6 +183,14 @@ func (s *Server) startStatsChannel(ctx context.Context) {
}
}
func (s *Server) initSegmentInfoChannel() {
segmentInfoStream := pulsarms.NewPulsarMsgStream(s.ctx, 1024)
segmentInfoStream.SetPulsarClient(Params.PulsarAddress)
segmentInfoStream.CreatePulsarProducers([]string{Params.SegmentInfoChannelName})
s.segmentInfoStream = segmentInfoStream
s.segmentInfoStream.Start()
}
func (s *Server) loadMetaFromMaster() error {
log.Println("loading collection meta from master")
collections, err := s.masterClient.ShowCollections(&milvuspb.ShowCollectionRequest{
@ -236,6 +252,7 @@ func (s *Server) loadMetaFromMaster() error {
func (s *Server) Stop() error {
s.ttMsgStream.Close()
s.msgProducer.Close()
s.segmentInfoStream.Close()
s.stopServerLoop()
return nil
}
@ -266,22 +283,12 @@ func (s *Server) GetComponentStates() (*internalpb2.ComponentStates, error) {
return resp, nil
}
func (s *Server) GetTimeTickChannel() (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
Value: Params.TimeTickChannelName,
}, nil
func (s *Server) GetTimeTickChannel() (string, error) {
return Params.TimeTickChannelName, nil
}
func (s *Server) GetStatisticsChannel() (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
Value: Params.StatisticsChannelName,
}, nil
func (s *Server) GetStatisticsChannel() (string, error) {
return Params.StatisticsChannelName, nil
}
func (s *Server) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) {
@ -291,7 +298,7 @@ func (s *Server) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.Register
},
}
s.cluster.Register(req.Address.Ip, req.Address.Port, req.Base.SourceID)
if len(s.ddChannelName) == 0 {
if s.ddChannelName == "" {
resp, err := s.masterClient.GetDdChannel()
if err != nil {
ret.Status.Reason = err.Error()
@ -306,21 +313,14 @@ func (s *Server) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.Register
{Key: "DDChannelName", Value: s.ddChannelName},
{Key: "SegmentStatisticsChannelName", Value: Params.StatisticsChannelName},
{Key: "TimeTickChannelName", Value: Params.TimeTickChannelName},
{Key: "CompleteFlushChannelName", Value: Params.SegmentChannelName},
{Key: "CompleteFlushChannelName", Value: Params.SegmentInfoChannelName},
},
}
return ret, nil
}
func (s *Server) Flush(req *datapb.FlushRequest) (*commonpb.Status, error) {
success, fails := s.segAllocator.SealAllSegments(req.CollectionID)
log.Printf("sealing failed segments: %v", fails)
if !success {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: fmt.Sprintf("flush failed, %d segment can not be sealed", len(fails)),
}, nil
}
s.segAllocator.SealAllSegments(req.CollectionID)
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}, nil
@ -383,7 +383,12 @@ func (s *Server) openNewSegment(collectionID UniqueID, partitionID UniqueID, cha
if err != nil {
return err
}
segmentInfo, err := s.meta.BuildSegment(collectionID, partitionID, group)
id, err := s.allocator.allocID()
if err != nil {
return err
}
segmentInfo, err := BuildSegment(collectionID, partitionID, id, group)
if err != nil {
return err
}
@ -425,22 +430,14 @@ func (s *Server) GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*dat
panic("implement me")
}
func (s *Server) GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error) {
resp := &internalpb2.StringList{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
}
func (s *Server) GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error) {
contains, ret := s.insertChannelMgr.ContainsCollection(req.CollectionID)
if contains {
resp.Values = ret
return resp, nil
return ret, nil
}
channelGroups, err := s.insertChannelMgr.AllocChannels(req.CollectionID, s.cluster.GetNumOfNodes())
if err != nil {
resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
resp.Status.Reason = err.Error()
return resp, nil
return nil, err
}
channels := make([]string, Params.InsertChannelNumPerCollection)
@ -449,8 +446,7 @@ func (s *Server) GetInsertChannels(req *datapb.InsertChannelRequest) (*internalp
}
s.cluster.WatchInsertChannels(channelGroups)
resp.Values = channels
return resp, nil
return channels, nil
}
func (s *Server) GetCollectionStatistics(req *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error) {
@ -462,3 +458,7 @@ func (s *Server) GetPartitionStatistics(req *datapb.PartitionStatsRequest) (*dat
// todo implement
return nil, nil
}
func (s *Server) GetSegmentInfoChannel() (string, error) {
return Params.SegmentInfoChannelName, nil
}

View File

@ -20,6 +20,10 @@ func (handler *statsHandler) HandleSegmentStat(segStats *internalpb2.SegmentStat
return err
}
//if segStats.IsNewSegment {
// segMeta.OpenTime = segStats.CreateTime
// segMeta.segStats.StartPositions
//}
segMeta.NumRows = segStats.NumRows
segMeta.MemSize = segStats.MemorySize

View File

@ -66,7 +66,7 @@ func (watcher *dataNodeTimeTickWatcher) StartBackgroundLoop(ctx context.Context)
for {
select {
case <-ctx.Done():
log.Println("data node time tick watcher clsoed")
log.Println("data node time tick watcher closed")
return
case msg := <-watcher.msgQueue:
segments, err := watcher.allocator.GetSealedSegments()
@ -86,6 +86,10 @@ func (watcher *dataNodeTimeTickWatcher) StartBackgroundLoop(ctx context.Context)
log.Println(err.Error())
continue
}
if err = watcher.meta.SetSegmentState(id, datapb.SegmentState_SegmentSealed); err != nil {
log.Println(err.Error())
continue
}
watcher.cluster.FlushSegment(&datapb.FlushSegRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kShowCollections,

View File

@ -2,14 +2,13 @@ package dataservice
import (
"context"
"errors"
"time"
"google.golang.org/grpc"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
)
@ -59,12 +58,26 @@ func (c *Client) GetComponentStates() (*internalpb2.ComponentStates, error) {
return c.grpcClient.GetComponentStates(context.Background(), nil)
}
func (c *Client) GetTimeTickChannel() (*milvuspb.StringResponse, error) {
return c.grpcClient.GetTimeTickChannel(context.Background(), nil)
func (c *Client) GetTimeTickChannel() (string, error) {
resp, err := c.grpcClient.GetTimeTickChannel(context.Background(), nil)
if err != nil {
return "", err
}
if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return "", errors.New(resp.Status.Reason)
}
return resp.Value, nil
}
func (c *Client) GetStatisticsChannel() (*milvuspb.StringResponse, error) {
return c.grpcClient.GetStatisticsChannel(context.Background(), nil)
func (c *Client) GetStatisticsChannel() (string, error) {
resp, err := c.grpcClient.GetStatisticsChannel(context.Background(), nil)
if err != nil {
return "", err
}
if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return "", errors.New(resp.Status.Reason)
}
return resp.Value, nil
}
func (c *Client) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) {
@ -91,8 +104,15 @@ func (c *Client) GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*dat
return c.grpcClient.GetInsertBinlogPaths(context.Background(), req)
}
func (c *Client) GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error) {
return c.grpcClient.GetInsertChannels(context.Background(), req)
func (c *Client) GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error) {
resp, err := c.grpcClient.GetInsertChannels(context.Background(), req)
if err != nil {
return nil, err
}
if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return nil, errors.New(resp.Status.Reason)
}
return resp.Values, nil
}
func (c *Client) GetCollectionStatistics(req *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error) {
@ -102,3 +122,14 @@ func (c *Client) GetCollectionStatistics(req *datapb.CollectionStatsRequest) (*d
func (c *Client) GetPartitionStatistics(req *datapb.PartitionStatsRequest) (*datapb.PartitionStatsResponse, error) {
return c.grpcClient.GetPartitionStatistics(context.Background(), req)
}
func (c *Client) GetSegmentInfoChannel() (string, error) {
resp, err := c.grpcClient.GetSegmentInfoChannel(context.Background(), nil)
if err != nil {
return "", err
}
if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return "", errors.New(resp.Status.Reason)
}
return resp.Value, nil
}

View File

@ -108,7 +108,20 @@ func (s *Service) GetInsertBinlogPaths(ctx context.Context, request *datapb.Inse
}
func (s *Service) GetInsertChannels(ctx context.Context, request *datapb.InsertChannelRequest) (*internalpb2.StringList, error) {
return s.server.GetInsertChannels(request)
resp := &internalpb2.StringList{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
}
channels, err := s.server.GetInsertChannels(request)
if err != nil {
resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
resp.Status.Reason = err.Error()
return resp, nil
}
resp.Values = channels
return resp, nil
}
func (s *Service) GetCollectionStatistics(ctx context.Context, request *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error) {
@ -124,9 +137,51 @@ func (s *Service) GetComponentStates(ctx context.Context, empty *commonpb.Empty)
}
func (s *Service) GetTimeTickChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
return s.server.GetTimeTickChannel()
resp := &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
}
channel, err := s.server.GetTimeTickChannel()
if err != nil {
resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
resp.Status.Reason = err.Error()
return resp, nil
}
resp.Value = channel
return resp, nil
}
func (s *Service) GetStatisticsChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
return s.server.GetStatisticsChannel()
resp := &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
}
channel, err := s.server.GetStatisticsChannel()
if err != nil {
resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
resp.Status.Reason = err.Error()
return resp, nil
}
resp.Value = channel
return resp, nil
}
func (s *Service) GetSegmentInfoChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
resp := &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
}
channel, err := s.server.GetSegmentInfoChannel()
if err != nil {
resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
resp.Status.Reason = err.Error()
return resp, nil
}
resp.Value = channel
return resp, nil
}

View File

@ -5,7 +5,6 @@ import (
"time"
"github.com/zilliztech/milvus-distributed/internal/errors"
cms "github.com/zilliztech/milvus-distributed/internal/masterservice"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
@ -57,93 +56,63 @@ func (c *GrpcClient) Stop() error {
}
func (c *GrpcClient) GetComponentStates() (*internalpb2.ComponentStates, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
return c.grpcClient.GetComponentStatesRPC(ctx, &commonpb.Empty{})
return c.grpcClient.GetComponentStatesRPC(context.Background(), &commonpb.Empty{})
}
//DDL request
func (c *GrpcClient) CreateCollection(in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
return c.grpcClient.CreateCollection(ctx, in)
return c.grpcClient.CreateCollection(context.Background(), in)
}
func (c *GrpcClient) DropCollection(in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
return c.grpcClient.DropCollection(ctx, in)
return c.grpcClient.DropCollection(context.Background(), in)
}
func (c *GrpcClient) HasCollection(in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
return c.grpcClient.HasCollection(ctx, in)
return c.grpcClient.HasCollection(context.Background(), in)
}
func (c *GrpcClient) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
return c.grpcClient.DescribeCollection(ctx, in)
return c.grpcClient.DescribeCollection(context.Background(), in)
}
func (c *GrpcClient) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
return c.grpcClient.ShowCollections(ctx, in)
return c.grpcClient.ShowCollections(context.Background(), in)
}
func (c *GrpcClient) CreatePartition(in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
return c.grpcClient.CreatePartition(ctx, in)
return c.grpcClient.CreatePartition(context.Background(), in)
}
func (c *GrpcClient) DropPartition(in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
return c.grpcClient.DropPartition(ctx, in)
return c.grpcClient.DropPartition(context.Background(), in)
}
func (c *GrpcClient) HasPartition(in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
return c.grpcClient.HasPartition(ctx, in)
return c.grpcClient.HasPartition(context.Background(), in)
}
func (c *GrpcClient) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
return c.grpcClient.ShowPartitions(ctx, in)
return c.grpcClient.ShowPartitions(context.Background(), in)
}
//index builder service
func (c *GrpcClient) CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
return c.grpcClient.CreateIndex(ctx, in)
return c.grpcClient.CreateIndex(context.Background(), in)
}
func (c *GrpcClient) DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
return c.grpcClient.DescribeIndex(ctx, in)
return c.grpcClient.DescribeIndex(context.Background(), in)
}
//global timestamp allocator
func (c *GrpcClient) AllocTimestamp(in *masterpb.TsoRequest) (*masterpb.TsoResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
return c.grpcClient.AllocTimestamp(ctx, in)
return c.grpcClient.AllocTimestamp(context.Background(), in)
}
func (c *GrpcClient) AllocID(in *masterpb.IDRequest) (*masterpb.IDResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
return c.grpcClient.AllocID(ctx, in)
return c.grpcClient.AllocID(context.Background(), in)
}
//receiver time tick from proxy service, and put it into this channel
func (c *GrpcClient) GetTimeTickChannel() (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
rsp, err := c.grpcClient.GetTimeTickChannelRPC(ctx, &commonpb.Empty{})
rsp, err := c.grpcClient.GetTimeTickChannelRPC(context.Background(), &commonpb.Empty{})
if err != nil {
return "", err
}
@ -155,9 +124,7 @@ func (c *GrpcClient) GetTimeTickChannel() (string, error) {
//receive ddl from rpc and time tick from proxy service, and put them into this channel
func (c *GrpcClient) GetDdChannel() (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
rsp, err := c.grpcClient.GetDdChannelRPC(ctx, &commonpb.Empty{})
rsp, err := c.grpcClient.GetDdChannelRPC(context.Background(), &commonpb.Empty{})
if err != nil {
return "", err
}
@ -169,9 +136,7 @@ func (c *GrpcClient) GetDdChannel() (string, error) {
//just define a channel, not used currently
func (c *GrpcClient) GetStatisticsChannel() (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
rsp, err := c.grpcClient.GetStatisticsChannelRPC(ctx, &commonpb.Empty{})
rsp, err := c.grpcClient.GetStatisticsChannelRPC(context.Background(), &commonpb.Empty{})
if err != nil {
return "", err
}
@ -182,13 +147,9 @@ func (c *GrpcClient) GetStatisticsChannel() (string, error) {
}
func (c *GrpcClient) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
return c.grpcClient.DescribeSegment(ctx, in)
return c.grpcClient.DescribeSegment(context.Background(), in)
}
func (c *GrpcClient) ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
return c.grpcClient.ShowSegments(ctx, in)
return c.grpcClient.ShowSegments(context.Background(), in)
}

View File

@ -1,7 +1,6 @@
package masterservice
import (
"context"
"fmt"
"math/rand"
"regexp"
@ -27,7 +26,7 @@ func TestGrpcService(t *testing.T) {
//cms.Params.Address = "127.0.0.1"
cms.Params.Port = (randVal % 100) + 10000
svr, err := NewGrpcServer(context.Background())
svr, err := NewGrpcServer()
assert.Nil(t, err)
// cms.Params.NodeID = 0

View File

@ -6,7 +6,6 @@ import (
"net"
"sync"
"github.com/zilliztech/milvus-distributed/internal/errors"
cms "github.com/zilliztech/milvus-distributed/internal/masterservice"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
@ -27,10 +26,10 @@ type GrpcServer struct {
cancel context.CancelFunc
}
func NewGrpcServer(ctx context.Context) (*GrpcServer, error) {
func NewGrpcServer() (*GrpcServer, error) {
s := &GrpcServer{}
var err error
s.ctx, s.cancel = context.WithCancel(ctx)
s.ctx, s.cancel = context.WithCancel(context.Background())
if s.core, err = cms.NewCore(s.ctx); err != nil {
return nil, err
}
@ -74,30 +73,6 @@ func (s *GrpcServer) Stop() error {
return err
}
func (s *GrpcServer) SetProxyService(p cms.ProxyServiceInterface) error {
c, ok := s.core.(*cms.Core)
if !ok {
return errors.Errorf("set proxy service failed")
}
return c.SetProxyService(p)
}
func (s *GrpcServer) SetDataService(p cms.DataServiceInterface) error {
c, ok := s.core.(*cms.Core)
if !ok {
return errors.Errorf("set data service failed")
}
return c.SetDataService(p)
}
func (s *GrpcServer) SetIndexService(p cms.IndexServiceInterface) error {
c, ok := s.core.(*cms.Core)
if !ok {
return errors.Errorf("set index service failed")
}
return c.SetIndexService(p)
}
func (s *GrpcServer) GetComponentStatesRPC(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) {
return s.core.GetComponentStates()
}

View File

@ -2,7 +2,6 @@ package masterservice
import (
"context"
"fmt"
"log"
"math/rand"
"strconv"
@ -736,13 +735,6 @@ func (c *Core) GetStatisticsChannel() (string, error) {
}
func (c *Core) CreateCollection(in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
code := c.stateCode.Load().(internalpb2.StateCode)
if code != internalpb2.StateCode_HEALTHY {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
}, nil
}
t := &CreateCollectionReqTask{
baseReqTask: baseReqTask{
cv: make(chan error),
@ -766,13 +758,6 @@ func (c *Core) CreateCollection(in *milvuspb.CreateCollectionRequest) (*commonpb
}
func (c *Core) DropCollection(in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
code := c.stateCode.Load().(internalpb2.StateCode)
if code != internalpb2.StateCode_HEALTHY {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
}, nil
}
t := &DropCollectionReqTask{
baseReqTask: baseReqTask{
cv: make(chan error),
@ -795,16 +780,6 @@ func (c *Core) DropCollection(in *milvuspb.DropCollectionRequest) (*commonpb.Sta
}
func (c *Core) HasCollection(in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
code := c.stateCode.Load().(internalpb2.StateCode)
if code != internalpb2.StateCode_HEALTHY {
return &milvuspb.BoolResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
},
Value: false,
}, nil
}
t := &HasCollectionReqTask{
baseReqTask: baseReqTask{
cv: make(chan error),
@ -834,17 +809,6 @@ func (c *Core) HasCollection(in *milvuspb.HasCollectionRequest) (*milvuspb.BoolR
}
func (c *Core) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
code := c.stateCode.Load().(internalpb2.StateCode)
if code != internalpb2.StateCode_HEALTHY {
return &milvuspb.DescribeCollectionResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
},
Schema: nil,
CollectionID: 0,
}, nil
}
t := &DescribeCollectionReqTask{
baseReqTask: baseReqTask{
cv: make(chan error),
@ -872,16 +836,6 @@ func (c *Core) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milv
}
func (c *Core) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) {
code := c.stateCode.Load().(internalpb2.StateCode)
if code != internalpb2.StateCode_HEALTHY {
return &milvuspb.ShowCollectionResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
},
CollectionNames: nil,
}, nil
}
t := &ShowCollectionReqTask{
baseReqTask: baseReqTask{
cv: make(chan error),
@ -911,13 +865,6 @@ func (c *Core) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.Sh
}
func (c *Core) CreatePartition(in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
code := c.stateCode.Load().(internalpb2.StateCode)
if code != internalpb2.StateCode_HEALTHY {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
}, nil
}
t := &CreatePartitionReqTask{
baseReqTask: baseReqTask{
cv: make(chan error),
@ -940,13 +887,6 @@ func (c *Core) CreatePartition(in *milvuspb.CreatePartitionRequest) (*commonpb.S
}
func (c *Core) DropPartition(in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
code := c.stateCode.Load().(internalpb2.StateCode)
if code != internalpb2.StateCode_HEALTHY {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
}, nil
}
t := &DropPartitionReqTask{
baseReqTask: baseReqTask{
cv: make(chan error),
@ -969,16 +909,6 @@ func (c *Core) DropPartition(in *milvuspb.DropPartitionRequest) (*commonpb.Statu
}
func (c *Core) HasPartition(in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
code := c.stateCode.Load().(internalpb2.StateCode)
if code != internalpb2.StateCode_HEALTHY {
return &milvuspb.BoolResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
},
Value: false,
}, nil
}
t := &HasPartitionReqTask{
baseReqTask: baseReqTask{
cv: make(chan error),
@ -1008,17 +938,6 @@ func (c *Core) HasPartition(in *milvuspb.HasPartitionRequest) (*milvuspb.BoolRes
}
func (c *Core) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) {
code := c.stateCode.Load().(internalpb2.StateCode)
if code != internalpb2.StateCode_HEALTHY {
return &milvuspb.ShowPartitionResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
},
PartitionNames: nil,
PartitionIDs: nil,
}, nil
}
t := &ShowPartitionReqTask{
baseReqTask: baseReqTask{
cv: make(chan error),
@ -1049,13 +968,6 @@ func (c *Core) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.Show
}
func (c *Core) CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
code := c.stateCode.Load().(internalpb2.StateCode)
if code != internalpb2.StateCode_HEALTHY {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
}, nil
}
t := &CreateIndexReqTask{
baseReqTask: baseReqTask{
cv: make(chan error),
@ -1078,16 +990,6 @@ func (c *Core) CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, e
}
func (c *Core) DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
code := c.stateCode.Load().(internalpb2.StateCode)
if code != internalpb2.StateCode_HEALTHY {
return &milvuspb.DescribeIndexResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
},
IndexDescriptions: nil,
}, nil
}
t := &DescribeIndexReqTask{
baseReqTask: baseReqTask{
cv: make(chan error),
@ -1118,16 +1020,6 @@ func (c *Core) DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.Descr
}
func (c *Core) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) {
code := c.stateCode.Load().(internalpb2.StateCode)
if code != internalpb2.StateCode_HEALTHY {
return &milvuspb.DescribeSegmentResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
},
IndexID: 0,
}, nil
}
t := &DescribeSegmentReqTask{
baseReqTask: baseReqTask{
cv: make(chan error),
@ -1158,16 +1050,6 @@ func (c *Core) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.D
}
func (c *Core) ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) {
code := c.stateCode.Load().(internalpb2.StateCode)
if code != internalpb2.StateCode_HEALTHY {
return &milvuspb.ShowSegmentResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
},
SegmentIDs: nil,
}, nil
}
t := &ShowSegmentReqTask{
baseReqTask: baseReqTask{
cv: make(chan error),

View File

@ -27,8 +27,6 @@ type ParamTable struct {
MaxPartitionNum int64
DefaultPartitionName string
DefaultIndexName string
Timeout int
}
func (p *ParamTable) Init() {
@ -56,8 +54,6 @@ func (p *ParamTable) Init() {
p.initMaxPartitionNum()
p.initDefaultPartitionName()
p.initDefaultIndexName()
p.initTimeout()
}
func (p *ParamTable) initAddress() {
@ -167,7 +163,3 @@ func (p *ParamTable) initDefaultIndexName() {
}
p.DefaultIndexName = name
}
func (p *ParamTable) initTimeout() {
p.Timeout = p.ParseInt("master.timeout")
}

View File

@ -50,7 +50,4 @@ func TestParamTable(t *testing.T) {
assert.NotEqual(t, Params.DefaultIndexName, "")
t.Logf("default index name = %s", Params.DefaultIndexName)
assert.NotZero(t, Params.Timeout)
t.Logf("master timeout = %d", Params.Timeout)
}

View File

@ -137,6 +137,8 @@ message SegmentInfo {
int64 num_rows=8;
int64 mem_size=9;
SegmentState state = 10;
repeated internal.MsgPosition start_position = 11;
repeated internal.MsgPosition end_position = 12;
}
message SegmentMsg{
@ -201,6 +203,8 @@ service DataService {
rpc GetComponentStates(common.Empty) returns (internal.ComponentStates) {}
rpc GetTimeTickChannel(common.Empty) returns(milvus.StringResponse) {}
rpc GetStatisticsChannel(common.Empty) returns(milvus.StringResponse){}
rpc GetSegmentInfoChannel(common.Empty) returns (milvus.StringResponse){}
}
service DataNode {

View File

@ -998,19 +998,21 @@ func (m *FlushSegRequest) GetSegmentIDs() []int64 {
}
type SegmentInfo struct {
SegmentID int64 `protobuf:"varint,1,opt,name=segmentID,proto3" json:"segmentID,omitempty"`
CollectionID int64 `protobuf:"varint,2,opt,name=collectionID,proto3" json:"collectionID,omitempty"`
PartitionID int64 `protobuf:"varint,3,opt,name=partitionID,proto3" json:"partitionID,omitempty"`
InsertChannels []string `protobuf:"bytes,4,rep,name=insert_channels,json=insertChannels,proto3" json:"insert_channels,omitempty"`
OpenTime uint64 `protobuf:"varint,5,opt,name=open_time,json=openTime,proto3" json:"open_time,omitempty"`
SealedTime uint64 `protobuf:"varint,6,opt,name=sealed_time,json=sealedTime,proto3" json:"sealed_time,omitempty"`
FlushedTime uint64 `protobuf:"varint,7,opt,name=flushed_time,json=flushedTime,proto3" json:"flushed_time,omitempty"`
NumRows int64 `protobuf:"varint,8,opt,name=num_rows,json=numRows,proto3" json:"num_rows,omitempty"`
MemSize int64 `protobuf:"varint,9,opt,name=mem_size,json=memSize,proto3" json:"mem_size,omitempty"`
State SegmentState `protobuf:"varint,10,opt,name=state,proto3,enum=milvus.proto.data.SegmentState" json:"state,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
SegmentID int64 `protobuf:"varint,1,opt,name=segmentID,proto3" json:"segmentID,omitempty"`
CollectionID int64 `protobuf:"varint,2,opt,name=collectionID,proto3" json:"collectionID,omitempty"`
PartitionID int64 `protobuf:"varint,3,opt,name=partitionID,proto3" json:"partitionID,omitempty"`
InsertChannels []string `protobuf:"bytes,4,rep,name=insert_channels,json=insertChannels,proto3" json:"insert_channels,omitempty"`
OpenTime uint64 `protobuf:"varint,5,opt,name=open_time,json=openTime,proto3" json:"open_time,omitempty"`
SealedTime uint64 `protobuf:"varint,6,opt,name=sealed_time,json=sealedTime,proto3" json:"sealed_time,omitempty"`
FlushedTime uint64 `protobuf:"varint,7,opt,name=flushed_time,json=flushedTime,proto3" json:"flushed_time,omitempty"`
NumRows int64 `protobuf:"varint,8,opt,name=num_rows,json=numRows,proto3" json:"num_rows,omitempty"`
MemSize int64 `protobuf:"varint,9,opt,name=mem_size,json=memSize,proto3" json:"mem_size,omitempty"`
State SegmentState `protobuf:"varint,10,opt,name=state,proto3,enum=milvus.proto.data.SegmentState" json:"state,omitempty"`
StartPosition []*internalpb2.MsgPosition `protobuf:"bytes,11,rep,name=start_position,json=startPosition,proto3" json:"start_position,omitempty"`
EndPosition []*internalpb2.MsgPosition `protobuf:"bytes,12,rep,name=end_position,json=endPosition,proto3" json:"end_position,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *SegmentInfo) Reset() { *m = SegmentInfo{} }
@ -1108,6 +1110,20 @@ func (m *SegmentInfo) GetState() SegmentState {
return SegmentState_SegmentNone
}
func (m *SegmentInfo) GetStartPosition() []*internalpb2.MsgPosition {
if m != nil {
return m.StartPosition
}
return nil
}
func (m *SegmentInfo) GetEndPosition() []*internalpb2.MsgPosition {
if m != nil {
return m.EndPosition
}
return nil
}
type SegmentMsg struct {
Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"`
Segment *SegmentInfo `protobuf:"bytes,2,opt,name=segment,proto3" json:"segment,omitempty"`
@ -1548,104 +1564,106 @@ func init() {
func init() { proto.RegisterFile("data_service.proto", fileDescriptor_3385cd32ad6cfe64) }
var fileDescriptor_3385cd32ad6cfe64 = []byte{
// 1541 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x58, 0xdf, 0x6e, 0x1b, 0x45,
0x17, 0xcf, 0x7a, 0xfd, 0xf7, 0xd8, 0xb1, 0xdd, 0x49, 0x9a, 0xa4, 0x6e, 0xbf, 0x36, 0xd9, 0x4f,
0x6d, 0xd2, 0xea, 0xfb, 0x12, 0x94, 0x0a, 0x0a, 0x37, 0x88, 0xa6, 0x6e, 0x23, 0xab, 0x4d, 0x14,
0x8d, 0x0b, 0x15, 0xbd, 0xb1, 0xd6, 0xf6, 0xc4, 0x19, 0xf0, 0xee, 0x9a, 0x9d, 0x71, 0x93, 0xe6,
0x06, 0xae, 0x40, 0x42, 0x48, 0x70, 0xc5, 0x05, 0x5c, 0xf3, 0x02, 0xf0, 0x00, 0xbc, 0x02, 0x8f,
0xc1, 0x53, 0x20, 0xb4, 0x33, 0xb3, 0xff, 0xec, 0x75, 0x6c, 0xdc, 0x96, 0xde, 0x79, 0xce, 0xfe,
0xe6, 0x9c, 0x33, 0xe7, 0x9c, 0xf9, 0x9d, 0x33, 0x06, 0xd4, 0x35, 0xb9, 0xd9, 0x62, 0xc4, 0x7d,
0x41, 0x3b, 0x64, 0x7b, 0xe0, 0x3a, 0xdc, 0x41, 0x97, 0x2c, 0xda, 0x7f, 0x31, 0x64, 0x72, 0xb5,
0xed, 0x01, 0x6a, 0xa5, 0x8e, 0x63, 0x59, 0x8e, 0x2d, 0x45, 0xb5, 0x32, 0xb5, 0x39, 0x71, 0x6d,
0xb3, 0xaf, 0xd6, 0xa5, 0xe8, 0x06, 0xe3, 0x4b, 0x58, 0xc2, 0xa4, 0x47, 0x19, 0x27, 0xee, 0xa1,
0xd3, 0x25, 0x98, 0x7c, 0x31, 0x24, 0x8c, 0xa3, 0x77, 0x20, 0xdd, 0x36, 0x19, 0x59, 0xd3, 0xd6,
0xb5, 0xad, 0xe2, 0xee, 0xb5, 0xed, 0x98, 0x11, 0xa5, 0xfe, 0x80, 0xf5, 0xf6, 0x4c, 0x46, 0xb0,
0x40, 0xa2, 0xf7, 0x20, 0x67, 0x76, 0xbb, 0x2e, 0x61, 0x6c, 0x2d, 0x75, 0xc1, 0xa6, 0xfb, 0x12,
0x83, 0x7d, 0xb0, 0xf1, 0xbd, 0x06, 0xcb, 0x71, 0x0f, 0xd8, 0xc0, 0xb1, 0x19, 0x41, 0x7b, 0x50,
0xa4, 0x36, 0xe5, 0xad, 0x81, 0xe9, 0x9a, 0x16, 0x53, 0x9e, 0x6c, 0xc4, 0x95, 0x06, 0x47, 0x6b,
0xd8, 0x94, 0x1f, 0x09, 0x20, 0x06, 0x1a, 0xfc, 0x46, 0x77, 0x21, 0xcb, 0xb8, 0xc9, 0x87, 0xbe,
0x4f, 0x57, 0x13, 0x7d, 0x6a, 0x0a, 0x08, 0x56, 0x50, 0xe3, 0x0f, 0x0d, 0x4a, 0x4d, 0xd2, 0x6b,
0xd4, 0xfd, 0x60, 0x2c, 0x43, 0xa6, 0xe3, 0x0c, 0x6d, 0x2e, 0x7c, 0x58, 0xc4, 0x72, 0x81, 0xd6,
0xa1, 0xd8, 0x39, 0x31, 0x6d, 0x9b, 0xf4, 0x0f, 0x4d, 0x8b, 0x08, 0x03, 0x05, 0x1c, 0x15, 0x21,
0x03, 0x4a, 0x1d, 0xa7, 0xdf, 0x27, 0x1d, 0x4e, 0x1d, 0xbb, 0x51, 0x5f, 0xd3, 0xd7, 0xb5, 0x2d,
0x1d, 0xc7, 0x64, 0x9e, 0x96, 0x81, 0xe9, 0x72, 0xaa, 0x20, 0x69, 0x01, 0x89, 0x8a, 0xd0, 0x55,
0x28, 0x78, 0x3b, 0x5a, 0xb6, 0x67, 0x25, 0x23, 0xac, 0xe4, 0x3d, 0x81, 0x30, 0x71, 0x13, 0xca,
0x01, 0x56, 0x22, 0xb2, 0x02, 0xb1, 0x18, 0x48, 0x3d, 0x98, 0xf1, 0x83, 0x06, 0xe8, 0x3e, 0x63,
0xb4, 0x67, 0xc7, 0x0e, 0xb6, 0x02, 0x59, 0xdb, 0xe9, 0x92, 0x46, 0x5d, 0x9c, 0x4c, 0xc7, 0x6a,
0xe5, 0x99, 0x1c, 0x10, 0xe2, 0xb6, 0x5c, 0xa7, 0xef, 0x1f, 0x2c, 0xef, 0x09, 0xb0, 0xd3, 0x27,
0xe8, 0x21, 0x2c, 0xb2, 0x88, 0x12, 0xb6, 0xa6, 0xaf, 0xeb, 0x5b, 0xc5, 0xdd, 0x1b, 0xdb, 0x63,
0x85, 0xb8, 0x1d, 0x35, 0x86, 0xe3, 0xbb, 0x8c, 0xdf, 0x53, 0x50, 0x11, 0xdf, 0xa5, 0x5f, 0x16,
0xb1, 0x45, 0xa0, 0x05, 0x48, 0xb9, 0x23, 0x17, 0x33, 0x04, 0x3a, 0x48, 0x90, 0x1e, 0x4d, 0xd0,
0x68, 0xf8, 0xd3, 0xd3, 0xc3, 0x9f, 0x19, 0x0f, 0xff, 0x0d, 0x28, 0x92, 0xb3, 0x01, 0x75, 0x49,
0x8b, 0x53, 0x15, 0xde, 0x34, 0x06, 0x29, 0x7a, 0x4a, 0x2d, 0x12, 0xa9, 0xb1, 0xdc, 0xcc, 0x35,
0x16, 0x4f, 0x6a, 0x7e, 0x6a, 0x52, 0x0b, 0x49, 0x49, 0xfd, 0x49, 0x83, 0xa5, 0x58, 0x52, 0xd5,
0xc5, 0x39, 0x84, 0x2a, 0x8b, 0x07, 0xd6, 0xbb, 0x3d, 0x5e, 0x8e, 0x8c, 0x49, 0x39, 0x0a, 0xa1,
0x78, 0x6c, 0xef, 0x7c, 0x97, 0xe8, 0x0c, 0x4a, 0x8f, 0xfa, 0x43, 0x76, 0x32, 0x3f, 0xa1, 0x20,
0x48, 0x77, 0xdb, 0x8d, 0xba, 0x30, 0xaa, 0x63, 0xf1, 0x7b, 0x96, 0x94, 0x1a, 0xdf, 0x69, 0x80,
0x9a, 0x27, 0xce, 0x69, 0x93, 0xf4, 0xc4, 0x81, 0xe6, 0x76, 0x60, 0xd4, 0x58, 0x6a, 0x7a, 0xfd,
0xe8, 0x63, 0xf5, 0x63, 0x7c, 0x06, 0x4b, 0x31, 0x6f, 0x54, 0x92, 0xae, 0x03, 0x30, 0x29, 0x6a,
0xd4, 0x65, 0x7a, 0x74, 0x1c, 0x91, 0xcc, 0x17, 0xf4, 0x63, 0x58, 0x56, 0x76, 0xbc, 0x0f, 0x84,
0xcd, 0x7f, 0xf6, 0x6b, 0x50, 0x08, 0x9c, 0x51, 0x07, 0x0f, 0x05, 0xc6, 0x5f, 0x29, 0xb8, 0x3c,
0x62, 0x48, 0x1d, 0xeb, 0x5d, 0xc8, 0x78, 0xbe, 0x48, 0x53, 0xe5, 0x49, 0xa4, 0x10, 0x6c, 0xc4,
0x12, 0xed, 0x5d, 0xb2, 0x8e, 0x4b, 0x4c, 0xae, 0x2e, 0x59, 0x4a, 0x5e, 0x32, 0x29, 0x12, 0x97,
0xec, 0x06, 0x14, 0x19, 0x31, 0xfb, 0xa4, 0x2b, 0x01, 0xba, 0x04, 0x48, 0x91, 0x00, 0x6c, 0x40,
0xe9, 0xd8, 0xab, 0x37, 0x1f, 0x91, 0x16, 0x88, 0xa2, 0x92, 0x09, 0xc8, 0x63, 0xa8, 0x30, 0x6e,
0xba, 0xbc, 0x35, 0x70, 0x98, 0xc8, 0x0e, 0x5b, 0xcb, 0x24, 0x5d, 0x8b, 0xa0, 0xa9, 0x1c, 0xb0,
0xde, 0x91, 0x82, 0xe2, 0xb2, 0xd8, 0xea, 0x2f, 0x19, 0xda, 0x87, 0x45, 0x62, 0x77, 0x23, 0xaa,
0xb2, 0x33, 0xab, 0x2a, 0x11, 0xbb, 0x1b, 0x2a, 0x9a, 0x87, 0x3e, 0x0c, 0x0a, 0xab, 0x0d, 0x9b,
0x11, 0x97, 0xef, 0x51, 0xbb, 0xef, 0xf4, 0x8e, 0x4c, 0x7e, 0xf2, 0xa6, 0x72, 0xfd, 0x8b, 0x06,
0x57, 0x46, 0x6d, 0x85, 0xf9, 0xae, 0x41, 0xfe, 0x98, 0x92, 0x7e, 0x37, 0x2c, 0xe2, 0x60, 0x8d,
0xee, 0x41, 0x66, 0xe0, 0x81, 0xd7, 0x52, 0x22, 0x34, 0x93, 0x5a, 0x77, 0x93, 0xbb, 0xd4, 0xee,
0x3d, 0xa1, 0x8c, 0x63, 0x89, 0x8f, 0x84, 0x44, 0x9f, 0x3d, 0x24, 0x5f, 0x69, 0xb0, 0x2c, 0xfd,
0x7c, 0x20, 0x3b, 0xc3, 0x9b, 0x65, 0x9e, 0x84, 0x5e, 0x6e, 0x58, 0x70, 0xf9, 0x99, 0xc9, 0x3b,
0x27, 0x75, 0xeb, 0x95, 0x5d, 0xf0, 0xcc, 0x85, 0x0d, 0x4e, 0x86, 0xb0, 0x80, 0x63, 0x32, 0xe3,
0x67, 0x0d, 0x2a, 0x82, 0x63, 0x9b, 0xa4, 0xf7, 0xaf, 0x1f, 0x76, 0x84, 0xc0, 0xd2, 0xa3, 0x04,
0x66, 0xfc, 0x99, 0x82, 0xa2, 0xba, 0xea, 0x0d, 0xfb, 0xd8, 0x89, 0x57, 0x99, 0x36, 0x52, 0x65,
0xaf, 0x87, 0x6b, 0xd1, 0x26, 0x54, 0xa8, 0x28, 0x81, 0x96, 0x0a, 0x94, 0x74, 0xac, 0x80, 0xcb,
0x34, 0x5a, 0x19, 0xa2, 0xfd, 0x3a, 0x03, 0x62, 0x4b, 0xaa, 0xc8, 0x08, 0xaa, 0xc8, 0x7b, 0x82,
0x24, 0xae, 0xc9, 0x4e, 0xe5, 0x9a, 0xdc, 0x38, 0xd7, 0x5c, 0x81, 0xbc, 0x3d, 0xb4, 0x5a, 0xae,
0x73, 0xca, 0x44, 0x7b, 0xd7, 0x71, 0xce, 0x1e, 0x5a, 0xd8, 0x39, 0x65, 0xde, 0x27, 0x8b, 0x58,
0x2d, 0x46, 0xcf, 0x65, 0x5f, 0xd7, 0x71, 0xce, 0x22, 0x56, 0x93, 0x9e, 0x47, 0xd8, 0x13, 0xfe,
0x09, 0x7b, 0x1a, 0x67, 0x00, 0x4a, 0x7c, 0xc0, 0x7a, 0x73, 0x94, 0xc0, 0xfb, 0x90, 0x53, 0x99,
0x50, 0xcd, 0xe6, 0xfa, 0x64, 0xc3, 0x5e, 0x2e, 0xb1, 0x0f, 0xf7, 0x7a, 0xed, 0xca, 0x83, 0x20,
0x47, 0x9e, 0x53, 0xaf, 0xd0, 0x73, 0x56, 0x21, 0xd7, 0x6d, 0xcb, 0x79, 0x47, 0xce, 0x78, 0xd9,
0x6e, 0x5b, 0xcc, 0x43, 0x9b, 0x50, 0x09, 0x0b, 0x41, 0x02, 0x74, 0x01, 0x28, 0x87, 0x62, 0x31,
0x11, 0x7d, 0xa3, 0xc1, 0xea, 0x98, 0x3b, 0x8a, 0xa9, 0xee, 0xc9, 0xd8, 0xfa, 0xa3, 0xd0, 0x46,
0xa2, 0x43, 0x8f, 0xc9, 0xcb, 0x4f, 0xcc, 0xfe, 0x90, 0x1c, 0x99, 0xd4, 0x95, 0xd1, 0x9d, 0xb3,
0x13, 0xff, 0xaa, 0xc1, 0xe5, 0x23, 0xbf, 0x32, 0xdf, 0x76, 0x5c, 0x12, 0x06, 0xca, 0x74, 0xd2,
0x40, 0xf9, 0xb5, 0x06, 0x2b, 0xa3, 0x4e, 0xbf, 0x95, 0xe8, 0x1d, 0x40, 0xf9, 0x91, 0xd7, 0x45,
0x04, 0xbb, 0x1d, 0x10, 0x6e, 0xa2, 0x35, 0xc8, 0xa9, 0xbe, 0xa2, 0xb8, 0xc3, 0x5f, 0x7a, 0x97,
0xb1, 0x2d, 0x1a, 0x53, 0x2b, 0x6c, 0x36, 0x05, 0x5c, 0x6c, 0x87, 0xcd, 0xca, 0xf8, 0x56, 0x83,
0xaa, 0x2a, 0xdf, 0x50, 0xe3, 0xc5, 0x7c, 0xf4, 0x1f, 0x00, 0xca, 0x5a, 0xea, 0x46, 0x0b, 0xd7,
0xf3, 0xb8, 0x40, 0xd9, 0x23, 0x29, 0x40, 0x1f, 0x40, 0x56, 0xd8, 0xf7, 0x27, 0x88, 0x8d, 0x84,
0x0b, 0x13, 0x3f, 0x01, 0x56, 0x1b, 0x8c, 0x8f, 0xa1, 0x54, 0xaf, 0x3f, 0x09, 0xfd, 0x18, 0x65,
0x3e, 0x2d, 0x81, 0xf9, 0xa6, 0x9f, 0xf1, 0x8e, 0x23, 0xde, 0xac, 0x01, 0x35, 0xa0, 0x4a, 0xc0,
0xbe, 0x87, 0x8e, 0x4d, 0xaa, 0x0b, 0x68, 0x49, 0x3c, 0xb7, 0xa4, 0x80, 0x3f, 0x3c, 0xa3, 0x8c,
0x57, 0x35, 0x84, 0xa0, 0xac, 0x84, 0xfb, 0xae, 0x73, 0x4a, 0xed, 0x5e, 0x35, 0x85, 0x2e, 0xc1,
0xa2, 0xaf, 0x49, 0x50, 0x5e, 0x55, 0x8f, 0xc0, 0x54, 0x00, 0xaa, 0xe9, 0xdd, 0xdf, 0x0a, 0x50,
0xac, 0x9b, 0xdc, 0x6c, 0xca, 0x7f, 0x23, 0x90, 0x09, 0xa5, 0xe8, 0x33, 0x1e, 0xdd, 0x4a, 0x08,
0x49, 0xc2, 0x3f, 0x0d, 0xb5, 0xcd, 0xa9, 0x38, 0x59, 0x82, 0xc6, 0x02, 0xda, 0x87, 0x8c, 0xb0,
0x8f, 0x92, 0x88, 0x31, 0xfa, 0xda, 0xa8, 0x5d, 0x54, 0x65, 0xc6, 0x02, 0x6a, 0x43, 0x25, 0x78,
0x38, 0xa9, 0x84, 0xdf, 0x4c, 0x50, 0x39, 0xfe, 0x62, 0xae, 0xdd, 0x9a, 0x06, 0x0b, 0x9c, 0x6d,
0x41, 0x29, 0x32, 0xf7, 0xb3, 0x44, 0x03, 0xe3, 0xcf, 0x94, 0x44, 0x03, 0x09, 0xef, 0x07, 0x63,
0x01, 0xf5, 0xa0, 0xba, 0x4f, 0x78, 0x6c, 0x0c, 0x47, 0x9b, 0x53, 0x3a, 0x86, 0xcf, 0x42, 0xb5,
0xad, 0xe9, 0xc0, 0xc0, 0x90, 0x0b, 0xcb, 0xfb, 0x84, 0x8f, 0xcd, 0x80, 0xe8, 0x4e, 0x82, 0x8e,
0x09, 0x53, 0x69, 0xed, 0x7f, 0x33, 0x60, 0xa3, 0x36, 0x4d, 0xb8, 0x14, 0xd8, 0x0c, 0xba, 0xf6,
0xe6, 0x44, 0x25, 0xf1, 0x79, 0xab, 0x36, 0x7d, 0xd4, 0x14, 0xc7, 0x5a, 0xdd, 0x27, 0x3c, 0xde,
0x2e, 0x28, 0xe3, 0xb4, 0xc3, 0xd0, 0xed, 0x04, 0x43, 0xc9, 0x6d, 0xae, 0x76, 0x67, 0x16, 0x68,
0x70, 0x2c, 0x07, 0x56, 0xf6, 0x09, 0x8f, 0x71, 0xac, 0x32, 0x99, 0x94, 0x90, 0xc4, 0x06, 0x52,
0xbb, 0x3d, 0x03, 0x32, 0x30, 0xf8, 0x1c, 0x90, 0x38, 0xa4, 0x35, 0x70, 0xec, 0xb0, 0x4c, 0x6a,
0x89, 0xd7, 0xe3, 0xa1, 0x35, 0xe0, 0x2f, 0x47, 0x0b, 0x30, 0x88, 0xdd, 0x88, 0x0e, 0x63, 0x01,
0x3d, 0x13, 0xba, 0xbd, 0x71, 0xe7, 0x29, 0xed, 0x7c, 0xae, 0x52, 0x70, 0xa1, 0xee, 0xff, 0xc6,
0xbf, 0xa9, 0x85, 0xcc, 0x4a, 0xc4, 0xe9, 0x4f, 0x45, 0xc1, 0x85, 0xc1, 0x79, 0x7d, 0xaa, 0x77,
0x7f, 0x4c, 0x41, 0xde, 0x63, 0x2d, 0x41, 0x51, 0x6f, 0x32, 0x38, 0xcf, 0xa1, 0x12, 0x7f, 0x0b,
0x24, 0xa7, 0x38, 0xf1, 0xbd, 0x30, 0x8d, 0xbe, 0x30, 0x2c, 0xfa, 0x73, 0xbf, 0xe4, 0x16, 0x63,
0x12, 0x1f, 0x86, 0x2f, 0x83, 0x29, 0x3a, 0xf7, 0x3e, 0x7a, 0xfe, 0x61, 0x8f, 0xf2, 0x93, 0x61,
0xdb, 0xfb, 0xb2, 0x73, 0x4e, 0xfb, 0x7d, 0x7a, 0xce, 0x49, 0xe7, 0x64, 0x47, 0xee, 0xfa, 0x7f,
0x97, 0x32, 0xee, 0xd2, 0xf6, 0x90, 0x93, 0xee, 0x8e, 0x7f, 0xec, 0x1d, 0xa1, 0x6a, 0xc7, 0x33,
0x37, 0x68, 0xb7, 0xb3, 0x62, 0x75, 0xf7, 0xef, 0x00, 0x00, 0x00, 0xff, 0xff, 0x78, 0xd6, 0xcd,
0x18, 0xa5, 0x16, 0x00, 0x00,
// 1574 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x58, 0x5f, 0x6f, 0x1b, 0x45,
0x10, 0xcf, 0xf9, 0xfc, 0x77, 0xec, 0xd8, 0xee, 0xe6, 0x9f, 0xeb, 0x96, 0x36, 0x39, 0xd4, 0x26,
0xad, 0x20, 0x41, 0xa9, 0xa0, 0xf0, 0x82, 0x68, 0xea, 0x34, 0xb2, 0xda, 0x44, 0xd1, 0xba, 0x50,
0x91, 0x17, 0xeb, 0x6c, 0x6f, 0x9c, 0x05, 0xdf, 0x9d, 0xb9, 0x5d, 0x37, 0x69, 0x5e, 0xe0, 0x09,
0x24, 0x84, 0x54, 0x9e, 0x78, 0x80, 0x67, 0xbe, 0x00, 0x5f, 0x80, 0xaf, 0xc0, 0xe7, 0xe1, 0x01,
0xa1, 0xdb, 0x5d, 0xdf, 0x1f, 0xfb, 0x12, 0x1b, 0xb7, 0xa5, 0x6f, 0xde, 0xb9, 0xdf, 0xce, 0xcc,
0xce, 0xcc, 0xfe, 0x66, 0xd6, 0x80, 0x3a, 0x26, 0x37, 0x9b, 0x8c, 0xb8, 0xcf, 0x69, 0x9b, 0x6c,
0xf6, 0x5d, 0x87, 0x3b, 0xe8, 0x8a, 0x45, 0x7b, 0xcf, 0x07, 0x4c, 0xae, 0x36, 0x3d, 0x40, 0xb5,
0xd0, 0x76, 0x2c, 0xcb, 0xb1, 0xa5, 0xa8, 0x5a, 0xa4, 0x36, 0x27, 0xae, 0x6d, 0xf6, 0xd4, 0xba,
0x10, 0xde, 0x60, 0x7c, 0x0b, 0x0b, 0x98, 0x74, 0x29, 0xe3, 0xc4, 0x3d, 0x70, 0x3a, 0x04, 0x93,
0x6f, 0x06, 0x84, 0x71, 0xf4, 0x01, 0x24, 0x5b, 0x26, 0x23, 0x15, 0x6d, 0x55, 0xdb, 0xc8, 0x6f,
0x5f, 0xdf, 0x8c, 0x18, 0x51, 0xea, 0xf7, 0x59, 0x77, 0xc7, 0x64, 0x04, 0x0b, 0x24, 0xfa, 0x08,
0x32, 0x66, 0xa7, 0xe3, 0x12, 0xc6, 0x2a, 0x89, 0x4b, 0x36, 0x3d, 0x90, 0x18, 0x3c, 0x04, 0x1b,
0x2f, 0x35, 0x58, 0x8c, 0x7a, 0xc0, 0xfa, 0x8e, 0xcd, 0x08, 0xda, 0x81, 0x3c, 0xb5, 0x29, 0x6f,
0xf6, 0x4d, 0xd7, 0xb4, 0x98, 0xf2, 0x64, 0x2d, 0xaa, 0xd4, 0x3f, 0x5a, 0xdd, 0xa6, 0xfc, 0x50,
0x00, 0x31, 0x50, 0xff, 0x37, 0xba, 0x07, 0x69, 0xc6, 0x4d, 0x3e, 0x18, 0xfa, 0x74, 0x2d, 0xd6,
0xa7, 0x86, 0x80, 0x60, 0x05, 0x35, 0xfe, 0xd2, 0xa0, 0xd0, 0x20, 0xdd, 0x7a, 0x6d, 0x18, 0x8c,
0x45, 0x48, 0xb5, 0x9d, 0x81, 0xcd, 0x85, 0x0f, 0xf3, 0x58, 0x2e, 0xd0, 0x2a, 0xe4, 0xdb, 0x27,
0xa6, 0x6d, 0x93, 0xde, 0x81, 0x69, 0x11, 0x61, 0x20, 0x87, 0xc3, 0x22, 0x64, 0x40, 0xa1, 0xed,
0xf4, 0x7a, 0xa4, 0xcd, 0xa9, 0x63, 0xd7, 0x6b, 0x15, 0x7d, 0x55, 0xdb, 0xd0, 0x71, 0x44, 0xe6,
0x69, 0xe9, 0x9b, 0x2e, 0xa7, 0x0a, 0x92, 0x14, 0x90, 0xb0, 0x08, 0x5d, 0x83, 0x9c, 0xb7, 0xa3,
0x69, 0x7b, 0x56, 0x52, 0xc2, 0x4a, 0xd6, 0x13, 0x08, 0x13, 0xb7, 0xa0, 0xe8, 0x63, 0x25, 0x22,
0x2d, 0x10, 0xf3, 0xbe, 0xd4, 0x83, 0x19, 0x3f, 0x6b, 0x80, 0x1e, 0x30, 0x46, 0xbb, 0x76, 0xe4,
0x60, 0xcb, 0x90, 0xb6, 0x9d, 0x0e, 0xa9, 0xd7, 0xc4, 0xc9, 0x74, 0xac, 0x56, 0x9e, 0xc9, 0x3e,
0x21, 0x6e, 0xd3, 0x75, 0x7a, 0xc3, 0x83, 0x65, 0x3d, 0x01, 0x76, 0x7a, 0x04, 0xed, 0xc2, 0x3c,
0x0b, 0x29, 0x61, 0x15, 0x7d, 0x55, 0xdf, 0xc8, 0x6f, 0xdf, 0xdc, 0x1c, 0x2b, 0xc4, 0xcd, 0xb0,
0x31, 0x1c, 0xdd, 0x65, 0xfc, 0x99, 0x80, 0x92, 0xf8, 0x2e, 0xfd, 0xb2, 0x88, 0x2d, 0x02, 0x2d,
0x40, 0xca, 0x1d, 0xb9, 0x98, 0x22, 0xd0, 0x7e, 0x82, 0xf4, 0x70, 0x82, 0x46, 0xc3, 0x9f, 0x9c,
0x1c, 0xfe, 0xd4, 0x78, 0xf8, 0x6f, 0x42, 0x9e, 0x9c, 0xf5, 0xa9, 0x4b, 0x9a, 0x9c, 0xaa, 0xf0,
0x26, 0x31, 0x48, 0xd1, 0x53, 0x6a, 0x91, 0x50, 0x8d, 0x65, 0xa6, 0xae, 0xb1, 0x68, 0x52, 0xb3,
0x13, 0x93, 0x9a, 0x8b, 0x4b, 0xea, 0xaf, 0x1a, 0x2c, 0x44, 0x92, 0xaa, 0x2e, 0xce, 0x01, 0x94,
0x59, 0x34, 0xb0, 0xde, 0xed, 0xf1, 0x72, 0x64, 0x5c, 0x94, 0xa3, 0x00, 0x8a, 0xc7, 0xf6, 0xce,
0x76, 0x89, 0xce, 0xa0, 0xf0, 0xa8, 0x37, 0x60, 0x27, 0xb3, 0x13, 0x0a, 0x82, 0x64, 0xa7, 0x55,
0xaf, 0x09, 0xa3, 0x3a, 0x16, 0xbf, 0xa7, 0x49, 0xa9, 0xf1, 0x93, 0x06, 0xa8, 0x71, 0xe2, 0x9c,
0x36, 0x48, 0x57, 0x1c, 0x68, 0x66, 0x07, 0x46, 0x8d, 0x25, 0x26, 0xd7, 0x8f, 0x3e, 0x56, 0x3f,
0xc6, 0x57, 0xb0, 0x10, 0xf1, 0x46, 0x25, 0xe9, 0x06, 0x00, 0x93, 0xa2, 0x7a, 0x4d, 0xa6, 0x47,
0xc7, 0x21, 0xc9, 0x6c, 0x41, 0x3f, 0x86, 0x45, 0x65, 0xc7, 0xfb, 0x40, 0xd8, 0xec, 0x67, 0xbf,
0x0e, 0x39, 0xdf, 0x19, 0x75, 0xf0, 0x40, 0x60, 0xfc, 0x93, 0x80, 0xa5, 0x11, 0x43, 0xea, 0x58,
0x1f, 0x42, 0xca, 0xf3, 0x45, 0x9a, 0x2a, 0x5e, 0x44, 0x0a, 0xfe, 0x46, 0x2c, 0xd1, 0xde, 0x25,
0x6b, 0xbb, 0xc4, 0xe4, 0xea, 0x92, 0x25, 0xe4, 0x25, 0x93, 0x22, 0x71, 0xc9, 0x6e, 0x42, 0x9e,
0x11, 0xb3, 0x47, 0x3a, 0x12, 0xa0, 0x4b, 0x80, 0x14, 0x09, 0xc0, 0x1a, 0x14, 0x8e, 0xbd, 0x7a,
0x1b, 0x22, 0x92, 0x02, 0x91, 0x57, 0x32, 0x01, 0x79, 0x0c, 0x25, 0xc6, 0x4d, 0x97, 0x37, 0xfb,
0x0e, 0x13, 0xd9, 0x61, 0x95, 0x54, 0xdc, 0xb5, 0xf0, 0x9b, 0xca, 0x3e, 0xeb, 0x1e, 0x2a, 0x28,
0x2e, 0x8a, 0xad, 0xc3, 0x25, 0x43, 0x7b, 0x30, 0x4f, 0xec, 0x4e, 0x48, 0x55, 0x7a, 0x6a, 0x55,
0x05, 0x62, 0x77, 0x02, 0x45, 0xb3, 0xd0, 0x87, 0x41, 0x61, 0xa5, 0x6e, 0x33, 0xe2, 0xf2, 0x1d,
0x6a, 0xf7, 0x9c, 0xee, 0xa1, 0xc9, 0x4f, 0xde, 0x54, 0xae, 0x7f, 0xd7, 0xe0, 0xea, 0xa8, 0xad,
0x20, 0xdf, 0x55, 0xc8, 0x1e, 0x53, 0xd2, 0xeb, 0x04, 0x45, 0xec, 0xaf, 0xd1, 0x7d, 0x48, 0xf5,
0x3d, 0x70, 0x25, 0x21, 0x42, 0x73, 0x51, 0xeb, 0x6e, 0x70, 0x97, 0xda, 0xdd, 0x27, 0x94, 0x71,
0x2c, 0xf1, 0xa1, 0x90, 0xe8, 0xd3, 0x87, 0xe4, 0x3b, 0x0d, 0x16, 0xa5, 0x9f, 0x0f, 0x65, 0x67,
0x78, 0xb3, 0xcc, 0x13, 0xd3, 0xcb, 0x0d, 0x0b, 0x96, 0x9e, 0x99, 0xbc, 0x7d, 0x52, 0xb3, 0x5e,
0xd9, 0x05, 0xcf, 0x5c, 0xd0, 0xe0, 0x64, 0x08, 0x73, 0x38, 0x22, 0x33, 0x7e, 0xd3, 0xa0, 0x24,
0x38, 0xb6, 0x41, 0xba, 0xff, 0xfb, 0x61, 0x47, 0x08, 0x2c, 0x39, 0x4a, 0x60, 0xc6, 0xdf, 0x3a,
0xe4, 0xd5, 0x55, 0xaf, 0xdb, 0xc7, 0x4e, 0xb4, 0xca, 0xb4, 0x91, 0x2a, 0x7b, 0x3d, 0x5c, 0x8b,
0xd6, 0xa1, 0x44, 0x45, 0x09, 0x34, 0x55, 0xa0, 0xa4, 0x63, 0x39, 0x5c, 0xa4, 0xe1, 0xca, 0x10,
0xed, 0xd7, 0xe9, 0x13, 0x5b, 0x52, 0x45, 0x4a, 0x50, 0x45, 0xd6, 0x13, 0xc4, 0x71, 0x4d, 0x7a,
0x22, 0xd7, 0x64, 0xc6, 0xb9, 0xe6, 0x2a, 0x64, 0xed, 0x81, 0xd5, 0x74, 0x9d, 0x53, 0x26, 0xda,
0xbb, 0x8e, 0x33, 0xf6, 0xc0, 0xc2, 0xce, 0x29, 0xf3, 0x3e, 0x59, 0xc4, 0x6a, 0x32, 0x7a, 0x2e,
0xfb, 0xba, 0x8e, 0x33, 0x16, 0xb1, 0x1a, 0xf4, 0x3c, 0xc4, 0x9e, 0xf0, 0x9f, 0xd8, 0xb3, 0x0e,
0xc5, 0x28, 0xb1, 0x55, 0xf2, 0x53, 0x93, 0xd1, 0x7c, 0x84, 0xd7, 0xd0, 0x2e, 0x14, 0xc2, 0xb4,
0x56, 0x29, 0x4c, 0xad, 0x28, 0x1f, 0x62, 0x35, 0xe3, 0x0c, 0x40, 0x39, 0xba, 0xcf, 0xba, 0x33,
0x14, 0xe5, 0xc7, 0x90, 0x51, 0xb5, 0xa1, 0xda, 0xdf, 0x8d, 0x8b, 0x43, 0xe1, 0x55, 0x17, 0x1e,
0xc2, 0xbd, 0xee, 0xbf, 0xfc, 0xd0, 0xaf, 0x1a, 0x2f, 0x4c, 0xaf, 0xd0, 0x05, 0x57, 0x20, 0xd3,
0x69, 0xc9, 0x09, 0x4c, 0x4e, 0x9d, 0xe9, 0x4e, 0x4b, 0x4c, 0x68, 0xeb, 0x50, 0x0a, 0x4a, 0x53,
0x02, 0x74, 0x01, 0x28, 0x06, 0x62, 0x31, 0xa3, 0xfd, 0xa0, 0xc1, 0xca, 0x98, 0x3b, 0x8a, 0x3b,
0xef, 0xcb, 0x6c, 0x0f, 0x87, 0xb3, 0xb5, 0x58, 0x87, 0x1e, 0x93, 0x17, 0x5f, 0x98, 0xbd, 0x01,
0x39, 0x34, 0xa9, 0x2b, 0xf3, 0x3d, 0xe3, 0x6c, 0xf0, 0x87, 0x06, 0x4b, 0x87, 0xc3, 0xbb, 0xf2,
0xb6, 0xe3, 0x12, 0x33, 0xe2, 0x26, 0xe3, 0x46, 0xdc, 0xef, 0x35, 0x58, 0x1e, 0x75, 0xfa, 0xad,
0x44, 0x6f, 0x1f, 0x8a, 0x8f, 0xbc, 0xbe, 0x26, 0xf8, 0x76, 0x9f, 0x70, 0x13, 0x55, 0x20, 0xa3,
0x3a, 0x9d, 0x62, 0xb3, 0xe1, 0xd2, 0xa3, 0x87, 0x96, 0x68, 0x95, 0xcd, 0xa0, 0xfd, 0xe5, 0x70,
0xbe, 0x15, 0xb4, 0x4f, 0xe3, 0x47, 0x0d, 0xca, 0xaa, 0x7c, 0x03, 0x8d, 0x97, 0x33, 0xe4, 0x3b,
0x00, 0x94, 0x35, 0x15, 0xc7, 0x08, 0xd7, 0xb3, 0x38, 0x47, 0xd9, 0x23, 0x29, 0x40, 0x9f, 0x40,
0x5a, 0xd8, 0x1f, 0xce, 0x34, 0x6b, 0x31, 0x17, 0x26, 0x7a, 0x02, 0xac, 0x36, 0x18, 0x9f, 0x43,
0xa1, 0x56, 0x7b, 0x12, 0xf8, 0x31, 0xca, 0xc5, 0x5a, 0x0c, 0x17, 0x4f, 0x3e, 0xe3, 0x5d, 0x47,
0xbc, 0xa2, 0x7d, 0xb2, 0x42, 0x25, 0xbf, 0x1f, 0x1c, 0x38, 0x36, 0x29, 0xcf, 0xa1, 0x05, 0xf1,
0x00, 0x94, 0x02, 0xbe, 0x7b, 0x46, 0x19, 0x2f, 0x6b, 0x08, 0x41, 0x51, 0x09, 0xf7, 0x5c, 0xe7,
0x94, 0xda, 0xdd, 0x72, 0x02, 0x5d, 0x81, 0xf9, 0xa1, 0x26, 0x41, 0xc2, 0x65, 0x3d, 0x04, 0x53,
0x01, 0x28, 0x27, 0xb7, 0x5f, 0x02, 0xe4, 0x6b, 0x26, 0x37, 0x1b, 0xf2, 0xff, 0x11, 0x64, 0x42,
0x21, 0xfc, 0xc7, 0x02, 0xba, 0x1d, 0x13, 0x92, 0x98, 0xff, 0x3e, 0xaa, 0xeb, 0x13, 0x71, 0xb2,
0x04, 0x8d, 0x39, 0xb4, 0x07, 0x29, 0x61, 0x1f, 0xc5, 0x51, 0x75, 0xf8, 0xfd, 0x53, 0xbd, 0xac,
0xca, 0x8c, 0x39, 0xd4, 0x82, 0x92, 0xff, 0x94, 0x53, 0x09, 0xbf, 0x15, 0xa3, 0x72, 0xfc, 0x0d,
0x5f, 0xbd, 0x3d, 0x09, 0xe6, 0x3b, 0xdb, 0x84, 0x42, 0xe8, 0x25, 0xc2, 0x62, 0x0d, 0x8c, 0x3f,
0x9c, 0x62, 0x0d, 0xc4, 0xbc, 0x68, 0x8c, 0x39, 0xd4, 0x85, 0xf2, 0x1e, 0xe1, 0x91, 0x87, 0x01,
0x5a, 0x9f, 0xd0, 0xc3, 0x86, 0x2c, 0x54, 0xdd, 0x98, 0x0c, 0xf4, 0x0d, 0xb9, 0xb0, 0xb8, 0x47,
0xf8, 0xd8, 0x54, 0x8a, 0xee, 0xc6, 0xe8, 0xb8, 0x60, 0x4e, 0xae, 0xbe, 0x37, 0x05, 0x36, 0x6c,
0xd3, 0x84, 0x2b, 0xbe, 0x4d, 0x7f, 0x8e, 0x58, 0xbf, 0x50, 0x49, 0x74, 0x02, 0xac, 0x4e, 0x1e,
0x7e, 0xc5, 0xb1, 0x56, 0xf6, 0x08, 0x8f, 0xb6, 0x0b, 0xca, 0x38, 0x6d, 0x33, 0x74, 0x27, 0xc6,
0x50, 0x7c, 0x9b, 0xab, 0xde, 0x9d, 0x06, 0xea, 0x1f, 0xcb, 0x81, 0xe5, 0x3d, 0xc2, 0x23, 0x1c,
0xab, 0x4c, 0xc6, 0x25, 0x24, 0xb6, 0x81, 0x54, 0xef, 0x4c, 0x81, 0xf4, 0x0d, 0x1e, 0x01, 0x12,
0x87, 0xb4, 0xfa, 0x8e, 0x1d, 0x94, 0x49, 0x35, 0xf6, 0x7a, 0xec, 0x5a, 0x7d, 0xfe, 0x62, 0xb4,
0x00, 0xfd, 0xd8, 0x8d, 0xe8, 0x30, 0xe6, 0xd0, 0x33, 0xa1, 0xdb, 0x1b, 0xc0, 0x9e, 0xd2, 0xf6,
0xd7, 0x2a, 0x05, 0x97, 0xea, 0x7e, 0x37, 0xfa, 0x4d, 0x2d, 0x64, 0x56, 0x42, 0x4e, 0x7f, 0x29,
0x0a, 0x2e, 0x08, 0xce, 0x6b, 0x54, 0x7d, 0x04, 0x4b, 0xc1, 0xa5, 0xf1, 0x66, 0x99, 0xd7, 0xa7,
0x7b, 0xfb, 0x97, 0x04, 0x64, 0x3d, 0x46, 0x14, 0xf4, 0xf7, 0x26, 0x03, 0x7f, 0x04, 0xa5, 0xe8,
0xcb, 0x27, 0xbe, 0x7c, 0x62, 0x5f, 0x47, 0x93, 0xa8, 0x11, 0xc3, 0xfc, 0xf0, 0x95, 0x23, 0x79,
0xcb, 0xb8, 0x88, 0x6b, 0x83, 0x77, 0xd0, 0x04, 0x9d, 0x3b, 0x9f, 0x1d, 0x7d, 0xda, 0xa5, 0xfc,
0x64, 0xd0, 0xf2, 0xbe, 0x6c, 0x9d, 0xd3, 0x5e, 0x8f, 0x9e, 0x73, 0xd2, 0x3e, 0xd9, 0x92, 0xbb,
0xde, 0xef, 0x50, 0xc6, 0x5d, 0xda, 0x1a, 0x70, 0xd2, 0xd9, 0x1a, 0x1e, 0x7b, 0x4b, 0xa8, 0xda,
0xf2, 0xcc, 0xf5, 0x5b, 0xad, 0xb4, 0x58, 0xdd, 0xfb, 0x37, 0x00, 0x00, 0xff, 0xff, 0x2f, 0x5b,
0x6e, 0xda, 0x93, 0x17, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@ -1672,6 +1690,7 @@ type DataServiceClient interface {
GetComponentStates(ctx context.Context, in *commonpb.Empty, opts ...grpc.CallOption) (*internalpb2.ComponentStates, error)
GetTimeTickChannel(ctx context.Context, in *commonpb.Empty, opts ...grpc.CallOption) (*milvuspb.StringResponse, error)
GetStatisticsChannel(ctx context.Context, in *commonpb.Empty, opts ...grpc.CallOption) (*milvuspb.StringResponse, error)
GetSegmentInfoChannel(ctx context.Context, in *commonpb.Empty, opts ...grpc.CallOption) (*milvuspb.StringResponse, error)
}
type dataServiceClient struct {
@ -1790,6 +1809,15 @@ func (c *dataServiceClient) GetStatisticsChannel(ctx context.Context, in *common
return out, nil
}
func (c *dataServiceClient) GetSegmentInfoChannel(ctx context.Context, in *commonpb.Empty, opts ...grpc.CallOption) (*milvuspb.StringResponse, error) {
out := new(milvuspb.StringResponse)
err := c.cc.Invoke(ctx, "/milvus.proto.data.DataService/GetSegmentInfoChannel", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// DataServiceServer is the server API for DataService service.
type DataServiceServer interface {
RegisterNode(context.Context, *RegisterNodeRequest) (*RegisterNodeResponse, error)
@ -1804,6 +1832,7 @@ type DataServiceServer interface {
GetComponentStates(context.Context, *commonpb.Empty) (*internalpb2.ComponentStates, error)
GetTimeTickChannel(context.Context, *commonpb.Empty) (*milvuspb.StringResponse, error)
GetStatisticsChannel(context.Context, *commonpb.Empty) (*milvuspb.StringResponse, error)
GetSegmentInfoChannel(context.Context, *commonpb.Empty) (*milvuspb.StringResponse, error)
}
// UnimplementedDataServiceServer can be embedded to have forward compatible implementations.
@ -1846,6 +1875,9 @@ func (*UnimplementedDataServiceServer) GetTimeTickChannel(ctx context.Context, r
func (*UnimplementedDataServiceServer) GetStatisticsChannel(ctx context.Context, req *commonpb.Empty) (*milvuspb.StringResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetStatisticsChannel not implemented")
}
func (*UnimplementedDataServiceServer) GetSegmentInfoChannel(ctx context.Context, req *commonpb.Empty) (*milvuspb.StringResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetSegmentInfoChannel not implemented")
}
func RegisterDataServiceServer(s *grpc.Server, srv DataServiceServer) {
s.RegisterService(&_DataService_serviceDesc, srv)
@ -2067,6 +2099,24 @@ func _DataService_GetStatisticsChannel_Handler(srv interface{}, ctx context.Cont
return interceptor(ctx, in, info, handler)
}
func _DataService_GetSegmentInfoChannel_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(commonpb.Empty)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DataServiceServer).GetSegmentInfoChannel(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/milvus.proto.data.DataService/GetSegmentInfoChannel",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DataServiceServer).GetSegmentInfoChannel(ctx, req.(*commonpb.Empty))
}
return interceptor(ctx, in, info, handler)
}
var _DataService_serviceDesc = grpc.ServiceDesc{
ServiceName: "milvus.proto.data.DataService",
HandlerType: (*DataServiceServer)(nil),
@ -2119,6 +2169,10 @@ var _DataService_serviceDesc = grpc.ServiceDesc{
MethodName: "GetStatisticsChannel",
Handler: _DataService_GetStatisticsChannel_Handler,
},
{
MethodName: "GetSegmentInfoChannel",
Handler: _DataService_GetSegmentInfoChannel_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "data_service.proto",