diff --git a/cmd/master/main.go b/cmd/master/main.go index e0814ea920..67600af4d5 100644 --- a/cmd/master/main.go +++ b/cmd/master/main.go @@ -20,8 +20,25 @@ func main() { etcdAddress, _ := masterParams.Params.EtcdAddress() etcdRootPath, _ := masterParams.Params.EtcdRootPath() + pulsarAddr, _ := masterParams.Params.PulsarAddress() - svr, err := master.CreateServer(ctx, etcdRootPath, etcdRootPath, []string{etcdAddress}) + opt := master.Option{ + KVRootPath: etcdRootPath, + MetaRootPath: etcdRootPath, + EtcdAddr: []string{etcdAddress}, + PulsarAddr: pulsarAddr, + ProxyIDs: nil, + PulsarProxyChannels: nil, + PulsarProxySubName: "", + SoftTTBInterval: 0, + WriteIDs: nil, + PulsarWriteChannels: nil, + PulsarWriteSubName: "", + PulsarDMChannels: nil, + PulsarK2SChannels: nil, + } + + svr, err := master.CreateServer(ctx, &opt) if err != nil { log.Print("create server failed", zap.Error(err)) } diff --git a/internal/master/collection_task_test.go b/internal/master/collection_task_test.go index 955551b9a7..1e4cff3a78 100644 --- a/internal/master/collection_task_test.go +++ b/internal/master/collection_task_test.go @@ -13,6 +13,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "github.com/zilliztech/milvus-distributed/internal/proto/servicepb" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" "go.etcd.io/etcd/clientv3" "google.golang.org/grpc" ) @@ -30,7 +31,23 @@ func TestMaster_CollectionTask(t *testing.T) { _, err = etcdCli.Delete(ctx, "/test/root", clientv3.WithPrefix()) assert.Nil(t, err) - svr, err := CreateServer(ctx, "/test/root/kv", "/test/root/meta", []string{etcdAddr}) + opt := Option{ + KVRootPath: "/test/root/kv", + MetaRootPath: "/test/root/meta", + EtcdAddr: []string{etcdAddr}, + PulsarAddr: "pulsar://localhost:6650", + ProxyIDs: []typeutil.UniqueID{1, 2}, + PulsarProxyChannels: []string{"proxy1", "proxy2"}, + PulsarProxySubName: "proxyTopics", + SoftTTBInterval: 300, + WriteIDs: []typeutil.UniqueID{3, 4}, + PulsarWriteChannels: []string{"write3", "write4"}, + PulsarWriteSubName: "writeTopics", + PulsarDMChannels: []string{"dm0", "dm1"}, + PulsarK2SChannels: []string{"k2s0", "k2s1"}, + } + + svr, err := CreateServer(ctx, &opt) assert.Nil(t, err) err = svr.Run(10002) assert.Nil(t, err) diff --git a/internal/master/grpc_service_test.go b/internal/master/grpc_service_test.go index 45b1523745..7b2bc44026 100644 --- a/internal/master/grpc_service_test.go +++ b/internal/master/grpc_service_test.go @@ -11,6 +11,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" "go.etcd.io/etcd/clientv3" "google.golang.org/grpc" ) @@ -30,7 +31,23 @@ func TestMaster_CreateCollection(t *testing.T) { _, err = etcdCli.Delete(ctx, "/test/root", clientv3.WithPrefix()) assert.Nil(t, err) - svr, err := CreateServer(ctx, "/test/root/kv", "/test/root/meta", []string{etcdAddr}) + opt := Option{ + KVRootPath: "/test/root/kv", + MetaRootPath: "/test/root/meta", + EtcdAddr: []string{etcdAddr}, + PulsarAddr: "pulsar://localhost:6650", + ProxyIDs: []typeutil.UniqueID{1, 2}, + PulsarProxyChannels: []string{"proxy1", "proxy2"}, + PulsarProxySubName: "proxyTopics", + SoftTTBInterval: 300, + WriteIDs: []typeutil.UniqueID{3, 4}, + PulsarWriteChannels: []string{"write3", "write4"}, + PulsarWriteSubName: "writeTopics", + PulsarDMChannels: []string{"dm0", "dm1"}, + PulsarK2SChannels: []string{"k2s0", "k2s1"}, + } + + svr, err := CreateServer(ctx, &opt) assert.Nil(t, err) err = svr.Run(10001) assert.Nil(t, err) diff --git a/internal/master/master.go b/internal/master/master.go index 9fb054a729..b0bbe3a547 100644 --- a/internal/master/master.go +++ b/internal/master/master.go @@ -22,12 +22,38 @@ import ( "github.com/zilliztech/milvus-distributed/internal/master/id" "github.com/zilliztech/milvus-distributed/internal/master/informer" masterParams "github.com/zilliztech/milvus-distributed/internal/master/paramtable" + "github.com/zilliztech/milvus-distributed/internal/master/timesync" "github.com/zilliztech/milvus-distributed/internal/master/tso" + ms "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) // Server is the pd server. + +type Option struct { + KVRootPath string + MetaRootPath string + EtcdAddr []string + + PulsarAddr string + + ////softTimeTickBarrier + ProxyIDs []typeutil.UniqueID + PulsarProxyChannels []string //TimeTick + PulsarProxySubName string + SoftTTBInterval Timestamp //Physical Time + Logical Time + + //hardTimeTickBarrier + WriteIDs []typeutil.UniqueID + PulsarWriteChannels []string + PulsarWriteSubName string + + PulsarDMChannels []string + PulsarK2SChannels []string +} + type Master struct { // Server state. isServing int64 @@ -54,6 +80,7 @@ type Master struct { kvBase *kv.EtcdKV scheduler *ddRequestScheduler mt *metaTable + tsmp timesync.MsgProducer // tso ticker tsTicker *time.Ticker @@ -89,25 +116,57 @@ func Init() { } // CreateServer creates the UNINITIALIZED pd server with given configuration. -func CreateServer(ctx context.Context, kvRootPath, metaRootPath string, etcdAddr []string) (*Master, error) { +func CreateServer(ctx context.Context, opt *Option) (*Master, error) { //Init(etcdAddr, kvRootPath) - etcdClient, err := clientv3.New(clientv3.Config{Endpoints: etcdAddr}) + etcdClient, err := clientv3.New(clientv3.Config{Endpoints: opt.EtcdAddr}) if err != nil { return nil, err } - etcdkv := kv.NewEtcdKV(etcdClient, metaRootPath) + etcdkv := kv.NewEtcdKV(etcdClient, opt.MetaRootPath) metakv, err := NewMetaTable(etcdkv) if err != nil { return nil, err } + //timeSyncMsgProducer + tsmp, err := timesync.NewTimeSyncMsgProducer(ctx) + if err != nil { + return nil, err + } + pulsarProxyStream := ms.NewPulsarMsgStream(ctx, 1024) //output stream + pulsarProxyStream.SetPulsarCient(opt.PulsarAddr) + pulsarProxyStream.CreatePulsarConsumers(opt.PulsarProxyChannels, opt.PulsarProxySubName, ms.NewUnmarshalDispatcher(), 1024) + pulsarProxyStream.Start() + var proxyStream ms.MsgStream = pulsarProxyStream + proxyTimeTickBarrier := timesync.NewSoftTimeTickBarrier(ctx, &proxyStream, opt.ProxyIDs, opt.SoftTTBInterval) + tsmp.SetProxyTtBarrier(proxyTimeTickBarrier) + + pulsarWriteStream := ms.NewPulsarMsgStream(ctx, 1024) //output stream + pulsarWriteStream.SetPulsarCient(opt.PulsarAddr) + pulsarWriteStream.CreatePulsarConsumers(opt.PulsarWriteChannels, opt.PulsarWriteSubName, ms.NewUnmarshalDispatcher(), 1024) + pulsarWriteStream.Start() + var writeStream ms.MsgStream = pulsarWriteStream + writeTimeTickBarrier := timesync.NewHardTimeTickBarrier(ctx, &writeStream, opt.WriteIDs) + tsmp.SetWriteNodeTtBarrier(writeTimeTickBarrier) + + pulsarDMStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream + pulsarDMStream.SetPulsarCient(opt.PulsarAddr) + pulsarDMStream.CreatePulsarProducers(opt.PulsarDMChannels) + tsmp.SetDMSyncStream(pulsarDMStream) + + pulsarK2SStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream + pulsarK2SStream.SetPulsarCient(opt.PulsarAddr) + pulsarK2SStream.CreatePulsarProducers(opt.PulsarK2SChannels) + tsmp.SetK2sSyncStream(pulsarK2SStream) + m := &Master{ ctx: ctx, startTimestamp: time.Now().Unix(), - kvBase: newKVBase(kvRootPath, etcdAddr), + kvBase: newKVBase(opt.KVRootPath, opt.EtcdAddr), scheduler: NewDDRequestScheduler(), mt: metakv, + tsmp: tsmp, ssChan: make(chan internalpb.SegmentStats, 10), grpcErr: make(chan error), pc: informer.NewPulsarClient(), @@ -189,6 +248,11 @@ func (s *Master) startServerLoop(ctx context.Context, grpcPort int64) error { s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(ctx) //go s.Se + s.serverLoopWg.Add(1) + if err := s.tsmp.Start(); err != nil { + return err + } + s.serverLoopWg.Add(1) go s.grpcLoop(grpcPort) @@ -209,6 +273,9 @@ func (s *Master) startServerLoop(ctx context.Context, grpcPort int64) error { } func (s *Master) stopServerLoop() { + s.tsmp.Close() + s.serverLoopWg.Done() + if s.grpcServer != nil { s.grpcServer.GracefulStop() log.Printf("server is closed, exit grpc server") diff --git a/internal/master/partition_task_test.go b/internal/master/partition_task_test.go index ff59c1be51..b337a5a99a 100644 --- a/internal/master/partition_task_test.go +++ b/internal/master/partition_task_test.go @@ -14,6 +14,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "github.com/zilliztech/milvus-distributed/internal/proto/servicepb" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" "go.etcd.io/etcd/clientv3" "google.golang.org/grpc" ) @@ -34,8 +35,24 @@ func TestMaster_Partition(t *testing.T) { _, err = etcdCli.Delete(ctx, "/test/root", clientv3.WithPrefix()) assert.Nil(t, err) + opt := Option{ + KVRootPath: "/test/root/kv", + MetaRootPath: "/test/root/meta", + EtcdAddr: []string{etcdAddr}, + PulsarAddr: "pulsar://localhost:6650", + ProxyIDs: []typeutil.UniqueID{1, 2}, + PulsarProxyChannels: []string{"proxy1", "proxy2"}, + PulsarProxySubName: "proxyTopics", + SoftTTBInterval: 300, + WriteIDs: []typeutil.UniqueID{3, 4}, + PulsarWriteChannels: []string{"write3", "write4"}, + PulsarWriteSubName: "writeTopics", + PulsarDMChannels: []string{"dm0", "dm1"}, + PulsarK2SChannels: []string{"k2s0", "k2s1"}, + } + port := 10000 + rand.Intn(1000) - svr, err := CreateServer(ctx, "/test/root/kv", "/test/root/meta", []string{etcdAddr}) + svr, err := CreateServer(ctx, &opt) assert.Nil(t, err) err = svr.Run(int64(port)) assert.Nil(t, err) diff --git a/internal/master/timesync/timetick.go b/internal/master/timesync/timetick.go index 715f78f4e9..61e3db24ef 100644 --- a/internal/master/timesync/timetick.go +++ b/internal/master/timesync/timetick.go @@ -1,12 +1,24 @@ package timesync -import "github.com/zilliztech/milvus-distributed/internal/util/typeutil" +import ( + ms "github.com/zilliztech/milvus-distributed/internal/msgstream" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" +) type ( UniqueID = typeutil.UniqueID Timestamp = typeutil.Timestamp ) +type MsgProducer interface { + SetProxyTtBarrier(proxyTtBarrier TimeTickBarrier) + SetWriteNodeTtBarrier(writeNodeTtBarrier TimeTickBarrier) + SetDMSyncStream(dmSync ms.MsgStream) + SetK2sSyncStream(k2sSync ms.MsgStream) + Start() error + Close() +} + type TimeTickBarrier interface { GetTimeTick() (Timestamp, error) Start() error diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index 163b8529f2..e4a65330dd 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -21,6 +21,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "github.com/zilliztech/milvus-distributed/internal/proto/servicepb" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) var ctx context.Context @@ -48,7 +49,23 @@ func startMaster(ctx context.Context) { kvRootPath := path.Join(rootPath, "kv") metaRootPath := path.Join(rootPath, "meta") - svr, err := master.CreateServer(ctx, kvRootPath, metaRootPath, []string{etcdAddr}) + opt := master.Option{ + KVRootPath: kvRootPath, + MetaRootPath: metaRootPath, + EtcdAddr: []string{etcdAddr}, + PulsarAddr: "pulsar://localhost:6650", + ProxyIDs: []typeutil.UniqueID{1, 2}, + PulsarProxyChannels: []string{"proxy1", "proxy2"}, + PulsarProxySubName: "proxyTopics", + SoftTTBInterval: 300, + WriteIDs: []typeutil.UniqueID{3, 4}, + PulsarWriteChannels: []string{"write3", "write4"}, + PulsarWriteSubName: "writeTopics", + PulsarDMChannels: []string{"dm0", "dm1"}, + PulsarK2SChannels: []string{"k2s0", "k2s1"}, + } + + svr, err := master.CreateServer(ctx, &opt) masterServer = svr if err != nil { log.Print("create server failed", zap.Error(err))