From bdf84f08ab80ee7f245e3eed223601dd5cc1ff74 Mon Sep 17 00:00:00 2001 From: neza2017 Date: Fri, 29 Jan 2021 14:40:19 +0800 Subject: [PATCH] Add watcher_test and fix some bugs Signed-off-by: neza2017 --- cmd/masterservice/main.go | 3 + internal/datanode/allocator.go | 2 +- internal/datanode/data_node_test.go | 47 ------- internal/datanode/meta_table.go | 125 ++++++++++--------- internal/datanode/meta_table_test.go | 73 +++++------ internal/distributed/proxynode/service.go | 2 +- internal/distributed/proxyservice/service.go | 2 +- internal/masterservice/master_service.go | 7 +- internal/proxyservice/impl.go | 3 +- scripts/run_go_unittest.sh | 2 +- 10 files changed, 120 insertions(+), 146 deletions(-) diff --git a/cmd/masterservice/main.go b/cmd/masterservice/main.go index e1886b77aa..a683e4a28f 100644 --- a/cmd/masterservice/main.go +++ b/cmd/masterservice/main.go @@ -36,6 +36,9 @@ func main() { psc.Params.Init() log.Printf("proxy service address : %s", psc.Params.ServiceAddress) proxyService := psc.NewClient(psc.Params.ServiceAddress) + if err = proxyService.Init(); err != nil { + panic(err) + } for cnt = 0; cnt < reTryCnt; cnt++ { pxStates, err := proxyService.GetComponentStates() diff --git a/internal/datanode/allocator.go b/internal/datanode/allocator.go index 83400a8ea2..48827b965b 100644 --- a/internal/datanode/allocator.go +++ b/internal/datanode/allocator.go @@ -24,7 +24,7 @@ func newAllocatorImpl(s MasterServiceInterface) *allocatorImpl { func (alloc *allocatorImpl) allocID() (UniqueID, error) { resp, err := alloc.masterService.AllocID(&masterpb.IDRequest{ Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_kShowCollections, + MsgType: commonpb.MsgType_kRequestID, MsgID: 1, // GOOSE TODO Timestamp: 0, // GOOSE TODO SourceID: Params.NodeID, diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index 1cbdf38d3c..7de7101c19 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -1,20 +1,15 @@ package datanode import ( - "context" - "fmt" "log" "math/rand" "os" "strconv" "testing" - "time" "go.etcd.io/etcd/clientv3" - "go.uber.org/zap" etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" - "github.com/zilliztech/milvus-distributed/internal/master" ) func makeNewChannelNames(names []string, suffix string) []string { @@ -36,52 +31,10 @@ func refreshChannelNames() { Params.InsertChannelNames = makeNewChannelNames(Params.InsertChannelNames, suffix) } -func startMaster(ctx context.Context) { - master.Init() - etcdAddr := master.Params.EtcdAddress - metaRootPath := master.Params.MetaRootPath - - etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) - if err != nil { - panic(err) - } - _, err = etcdCli.Delete(context.TODO(), metaRootPath, clientv3.WithPrefix()) - if err != nil { - panic(err) - } - - masterPort := 53101 - master.Params.Port = masterPort - svr, err := master.CreateServer(ctx) - if err != nil { - log.Print("create server failed", zap.Error(err)) - } - if err := svr.Run(int64(master.Params.Port)); err != nil { - log.Fatal("run server failed", zap.Error(err)) - } - - fmt.Println("Waiting for server!", svr.IsServing()) - Params.MasterAddress = master.Params.Address + ":" + strconv.Itoa(masterPort) -} - func TestMain(m *testing.M) { Params.Init() refreshChannelNames() - const ctxTimeInMillisecond = 2000 - const closeWithDeadline = true - var ctx context.Context - - if closeWithDeadline { - var cancel context.CancelFunc - d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, cancel = context.WithDeadline(context.Background(), d) - defer cancel() - } else { - ctx = context.Background() - } - - startMaster(ctx) exitCode := m.Run() os.Exit(exitCode) } diff --git a/internal/datanode/meta_table.go b/internal/datanode/meta_table.go index 898a6aa64a..6a9128ff86 100644 --- a/internal/datanode/meta_table.go +++ b/internal/datanode/meta_table.go @@ -12,9 +12,9 @@ import ( ) type metaTable struct { - client kv.TxnBase // + client kv.Base // segID2FlushMeta map[UniqueID]*datapb.SegmentFlushMeta - collID2DdlMeta map[UniqueID]*datapb.DDLFlushMeta // GOOSE TODO: addDDLFlush and has DDLFlush + collID2DdlMeta map[UniqueID]*datapb.DDLFlushMeta lock sync.RWMutex } @@ -36,24 +36,6 @@ func NewMetaTable(kv kv.TxnBase) (*metaTable, error) { return mt, nil } -func (mt *metaTable) AppendDDLBinlogPaths(collID UniqueID, paths []string) error { - mt.lock.Lock() - defer mt.lock.Unlock() - - _, ok := mt.collID2DdlMeta[collID] - if !ok { - mt.collID2DdlMeta[collID] = &datapb.DDLFlushMeta{ - CollectionID: collID, - BinlogPaths: make([]string, 0), - } - } - - meta := mt.collID2DdlMeta[collID] - meta.BinlogPaths = append(meta.BinlogPaths, paths...) - - return mt.saveDDLFlushMeta(meta) -} - func (mt *metaTable) AppendSegBinlogPaths(segmentID UniqueID, fieldID int64, dataPaths []string) error { _, ok := mt.segID2FlushMeta[segmentID] if !ok { @@ -97,44 +79,6 @@ func (mt *metaTable) CompleteFlush(segmentID UniqueID) error { return mt.saveSegFlushMeta(meta) } -// metaTable.lock.Lock() before call this function -func (mt *metaTable) saveDDLFlushMeta(meta *datapb.DDLFlushMeta) error { - value := proto.MarshalTextString(meta) - - mt.collID2DdlMeta[meta.CollectionID] = meta - prefix := path.Join(Params.DDLFlushMetaSubPath, strconv.FormatInt(meta.CollectionID, 10)) - - return mt.client.Save(prefix, value) -} - -func (mt *metaTable) reloadDdlMetaFromKV() error { - mt.collID2DdlMeta = make(map[UniqueID]*datapb.DDLFlushMeta) - _, values, err := mt.client.LoadWithPrefix(Params.DDLFlushMetaSubPath) - if err != nil { - return err - } - - for _, value := range values { - ddlMeta := &datapb.DDLFlushMeta{} - err = proto.UnmarshalText(value, ddlMeta) - if err != nil { - return err - } - mt.collID2DdlMeta[ddlMeta.CollectionID] = ddlMeta - } - return nil -} - -// metaTable.lock.Lock() before call this function -func (mt *metaTable) saveSegFlushMeta(meta *datapb.SegmentFlushMeta) error { - value := proto.MarshalTextString(meta) - - mt.segID2FlushMeta[meta.SegmentID] = meta - prefix := path.Join(Params.SegFlushMetaSubPath, strconv.FormatInt(meta.SegmentID, 10)) - - return mt.client.Save(prefix, value) -} - func (mt *metaTable) reloadSegMetaFromKV() error { mt.segID2FlushMeta = make(map[UniqueID]*datapb.SegmentFlushMeta) @@ -155,6 +99,16 @@ func (mt *metaTable) reloadSegMetaFromKV() error { return nil } +// metaTable.lock.Lock() before call this function +func (mt *metaTable) saveSegFlushMeta(meta *datapb.SegmentFlushMeta) error { + value := proto.MarshalTextString(meta) + + mt.segID2FlushMeta[meta.SegmentID] = meta + prefix := path.Join(Params.SegFlushMetaSubPath, strconv.FormatInt(meta.SegmentID, 10)) + + return mt.client.Save(prefix, value) +} + func (mt *metaTable) addSegmentFlush(segmentID UniqueID) error { mt.lock.Lock() defer mt.lock.Unlock() @@ -197,6 +151,61 @@ func (mt *metaTable) getSegBinlogPaths(segmentID UniqueID) (map[int64][]string, return ret, nil } +// --- DDL --- +func (mt *metaTable) AppendDDLBinlogPaths(collID UniqueID, paths []string) error { + mt.lock.Lock() + defer mt.lock.Unlock() + + _, ok := mt.collID2DdlMeta[collID] + if !ok { + mt.collID2DdlMeta[collID] = &datapb.DDLFlushMeta{ + CollectionID: collID, + BinlogPaths: make([]string, 0), + } + } + + meta := mt.collID2DdlMeta[collID] + meta.BinlogPaths = append(meta.BinlogPaths, paths...) + + return mt.saveDDLFlushMeta(meta) +} + +func (mt *metaTable) hasDDLFlushMeta(collID UniqueID) bool { + mt.lock.RLock() + defer mt.lock.RUnlock() + + _, ok := mt.collID2DdlMeta[collID] + return ok +} + +// metaTable.lock.Lock() before call this function +func (mt *metaTable) saveDDLFlushMeta(meta *datapb.DDLFlushMeta) error { + value := proto.MarshalTextString(meta) + + mt.collID2DdlMeta[meta.CollectionID] = meta + prefix := path.Join(Params.DDLFlushMetaSubPath, strconv.FormatInt(meta.CollectionID, 10)) + + return mt.client.Save(prefix, value) +} + +func (mt *metaTable) reloadDdlMetaFromKV() error { + mt.collID2DdlMeta = make(map[UniqueID]*datapb.DDLFlushMeta) + _, values, err := mt.client.LoadWithPrefix(Params.DDLFlushMetaSubPath) + if err != nil { + return err + } + + for _, value := range values { + ddlMeta := &datapb.DDLFlushMeta{} + err = proto.UnmarshalText(value, ddlMeta) + if err != nil { + return err + } + mt.collID2DdlMeta[ddlMeta.CollectionID] = ddlMeta + } + return nil +} + func (mt *metaTable) getDDLBinlogPaths(collID UniqueID) (map[UniqueID][]string, error) { mt.lock.RLock() defer mt.lock.RUnlock() diff --git a/internal/datanode/meta_table_test.go b/internal/datanode/meta_table_test.go index dd5a4251dc..247cbff51f 100644 --- a/internal/datanode/meta_table_test.go +++ b/internal/datanode/meta_table_test.go @@ -1,26 +1,16 @@ package datanode import ( - "context" "testing" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" - "go.etcd.io/etcd/clientv3" + memkv "github.com/zilliztech/milvus-distributed/internal/kv/mem" ) -func TestMetaTable_all(t *testing.T) { +func TestMetaTable_SegmentFlush(t *testing.T) { - etcdAddr := Params.EtcdAddress - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) - require.NoError(t, err) - etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/meta/root") - - _, err = cli.Delete(context.TODO(), "/etcd/test/meta/root", clientv3.WithPrefix()) - require.NoError(t, err) - - meta, err := NewMetaTable(etcdKV) + kvMock := memkv.NewMemoryKV() + meta, err := NewMetaTable(kvMock) assert.NoError(t, err) defer meta.client.Close() @@ -65,27 +55,6 @@ func TestMetaTable_all(t *testing.T) { ret) }) - t.Run("TestMetaTable_AppendDDLBinlogPaths", func(t *testing.T) { - - collID2Paths := map[UniqueID][]string{ - 301: {"a", "b", "c"}, - 302: {"c", "b", "a"}, - } - - for collID, dataPaths := range collID2Paths { - for _, dp := range dataPaths { - err = meta.AppendDDLBinlogPaths(collID, []string{dp}) - assert.Nil(t, err) - } - } - - for k, v := range collID2Paths { - ret, err := meta.getDDLBinlogPaths(k) - assert.Nil(t, err) - assert.Equal(t, map[UniqueID][]string{k: v}, ret) - } - }) - t.Run("TestMetaTable_CompleteFlush", func(t *testing.T) { var segmentID UniqueID = 401 @@ -105,3 +74,37 @@ func TestMetaTable_all(t *testing.T) { }) } + +func TestMetaTable_DDLFlush(t *testing.T) { + kvMock := memkv.NewMemoryKV() + meta, err := NewMetaTable(kvMock) + assert.NoError(t, err) + defer meta.client.Close() + + t.Run("TestMetaTable_AppendDDLBinlogPaths", func(t *testing.T) { + + assert.False(t, meta.hasDDLFlushMeta(301)) + assert.False(t, meta.hasDDLFlushMeta(302)) + + collID2Paths := map[UniqueID][]string{ + 301: {"a", "b", "c"}, + 302: {"c", "b", "a"}, + } + + for collID, dataPaths := range collID2Paths { + for _, dp := range dataPaths { + err = meta.AppendDDLBinlogPaths(collID, []string{dp}) + assert.Nil(t, err) + } + } + + for k, v := range collID2Paths { + ret, err := meta.getDDLBinlogPaths(k) + assert.Nil(t, err) + assert.Equal(t, map[UniqueID][]string{k: v}, ret) + } + + assert.True(t, meta.hasDDLFlushMeta(301)) + assert.True(t, meta.hasDDLFlushMeta(302)) + }) +} diff --git a/internal/distributed/proxynode/service.go b/internal/distributed/proxynode/service.go index 7617b07813..b295e685fe 100644 --- a/internal/distributed/proxynode/service.go +++ b/internal/distributed/proxynode/service.go @@ -123,7 +123,7 @@ func (s *Server) init() error { }() s.wg.Add(1) - s.startGrpcLoop(Params.Port) + go s.startGrpcLoop(Params.Port) // wait for grpc server loop start err = <-s.grpcErrChan if err != nil { diff --git a/internal/distributed/proxyservice/service.go b/internal/distributed/proxyservice/service.go index ece32912be..035321e55f 100644 --- a/internal/distributed/proxyservice/service.go +++ b/internal/distributed/proxyservice/service.go @@ -59,7 +59,7 @@ func (s *Server) init() error { proxyservice.Params.Init() s.wg.Add(1) - s.startGrpcLoop(Params.ServicePort) + go s.startGrpcLoop(Params.ServicePort) // wait for grpc server loop start if err := <-s.grpcErrChan; err != nil { return err diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index 89d7b10749..d2d8ab7613 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -247,7 +247,9 @@ func (c *Core) checkInit() error { if c.DataNodeSegmentFlushCompletedChan == nil { return errors.Errorf("DataNodeSegmentFlushCompletedChan is nil") } - log.Printf("master node id = %d\n", Params.NodeID) + log.Printf("master node id = %d", Params.NodeID) + log.Printf("master dd channel name = %s", Params.DdChannel) + log.Printf("master time ticke channel name = %s", Params.TimeTickChannel) return nil } @@ -607,6 +609,7 @@ func (c *Core) SetProxyService(s ProxyServiceInterface) error { return err } Params.ProxyTimeTickChannel = rsp + log.Printf("proxy time tick channel name = %s", Params.ProxyTimeTickChannel) c.InvalidateCollectionMetaCache = func(ts typeutil.Timestamp, dbName string, collectionName string) error { err := s.InvalidateCollectionMetaCache(&proxypb.InvalidateCollMetaCacheRequest{ @@ -633,6 +636,8 @@ func (c *Core) SetDataService(s DataServiceInterface) error { return err } Params.DataServiceSegmentChannel = rsp + log.Printf("data service segment channel name = %s", Params.DataServiceSegmentChannel) + c.GetBinlogFilePathsFromDataServiceReq = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) { ts, err := c.tsoAllocator.Alloc(1) if err != nil { diff --git a/internal/proxyservice/impl.go b/internal/proxyservice/impl.go index fe975d9fc4..6f6287931d 100644 --- a/internal/proxyservice/impl.go +++ b/internal/proxyservice/impl.go @@ -39,8 +39,9 @@ func (s *ServiceImpl) fillNodeInitParams() error { getConfigContentByName := func(fileName string) []byte { _, fpath, _, _ := runtime.Caller(0) - configFile := path.Dir(fpath) + "/../../../configs/" + fileName + configFile := path.Dir(fpath) + "/../../configs/" + fileName _, err := os.Stat(configFile) + log.Printf("configFile = %s", configFile) if os.IsNotExist(err) { runPath, err := os.Getwd() if err != nil { diff --git a/scripts/run_go_unittest.sh b/scripts/run_go_unittest.sh index d91ee4d326..0a105d0d97 100755 --- a/scripts/run_go_unittest.sh +++ b/scripts/run_go_unittest.sh @@ -18,7 +18,7 @@ go test -race -cover "${MILVUS_DIR}/kv/..." -failfast # TODO: remove to distributed #go test -race -cover "${MILVUS_DIR}/proxynode/..." -failfast #go test -race -cover "${MILVUS_DIR}/writenode/..." -failfast -#go test -race -cover "${MILVUS_DIR}/datanode/..." -failfast +go test -race -cover "${MILVUS_DIR}/datanode/..." -failfast #go test -race -cover "${MILVUS_DIR}/master/..." -failfast #go test -race -cover "${MILVUS_DIR}/indexnode/..." -failfast #go test -race -cover "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/querynode/..." "${MILVUS_DIR}/storage" "${MILVUS_DIR}/util/..." -failfast