mirror of https://github.com/milvus-io/milvus.git
Add watcher_test and fix some bugs
Signed-off-by: neza2017 <yefu.chen@zilliz.com>pull/4973/head^2
parent
f088467dde
commit
bdf84f08ab
|
@ -36,6 +36,9 @@ func main() {
|
|||
psc.Params.Init()
|
||||
log.Printf("proxy service address : %s", psc.Params.ServiceAddress)
|
||||
proxyService := psc.NewClient(psc.Params.ServiceAddress)
|
||||
if err = proxyService.Init(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
for cnt = 0; cnt < reTryCnt; cnt++ {
|
||||
pxStates, err := proxyService.GetComponentStates()
|
||||
|
|
|
@ -24,7 +24,7 @@ func newAllocatorImpl(s MasterServiceInterface) *allocatorImpl {
|
|||
func (alloc *allocatorImpl) allocID() (UniqueID, error) {
|
||||
resp, err := alloc.masterService.AllocID(&masterpb.IDRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_kShowCollections,
|
||||
MsgType: commonpb.MsgType_kRequestID,
|
||||
MsgID: 1, // GOOSE TODO
|
||||
Timestamp: 0, // GOOSE TODO
|
||||
SourceID: Params.NodeID,
|
||||
|
|
|
@ -1,20 +1,15 @@
|
|||
package datanode
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"os"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/clientv3"
|
||||
"go.uber.org/zap"
|
||||
|
||||
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
|
||||
"github.com/zilliztech/milvus-distributed/internal/master"
|
||||
)
|
||||
|
||||
func makeNewChannelNames(names []string, suffix string) []string {
|
||||
|
@ -36,52 +31,10 @@ func refreshChannelNames() {
|
|||
Params.InsertChannelNames = makeNewChannelNames(Params.InsertChannelNames, suffix)
|
||||
}
|
||||
|
||||
func startMaster(ctx context.Context) {
|
||||
master.Init()
|
||||
etcdAddr := master.Params.EtcdAddress
|
||||
metaRootPath := master.Params.MetaRootPath
|
||||
|
||||
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_, err = etcdCli.Delete(context.TODO(), metaRootPath, clientv3.WithPrefix())
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
masterPort := 53101
|
||||
master.Params.Port = masterPort
|
||||
svr, err := master.CreateServer(ctx)
|
||||
if err != nil {
|
||||
log.Print("create server failed", zap.Error(err))
|
||||
}
|
||||
if err := svr.Run(int64(master.Params.Port)); err != nil {
|
||||
log.Fatal("run server failed", zap.Error(err))
|
||||
}
|
||||
|
||||
fmt.Println("Waiting for server!", svr.IsServing())
|
||||
Params.MasterAddress = master.Params.Address + ":" + strconv.Itoa(masterPort)
|
||||
}
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
Params.Init()
|
||||
|
||||
refreshChannelNames()
|
||||
const ctxTimeInMillisecond = 2000
|
||||
const closeWithDeadline = true
|
||||
var ctx context.Context
|
||||
|
||||
if closeWithDeadline {
|
||||
var cancel context.CancelFunc
|
||||
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
|
||||
ctx, cancel = context.WithDeadline(context.Background(), d)
|
||||
defer cancel()
|
||||
} else {
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
startMaster(ctx)
|
||||
exitCode := m.Run()
|
||||
os.Exit(exitCode)
|
||||
}
|
||||
|
|
|
@ -12,9 +12,9 @@ import (
|
|||
)
|
||||
|
||||
type metaTable struct {
|
||||
client kv.TxnBase //
|
||||
client kv.Base //
|
||||
segID2FlushMeta map[UniqueID]*datapb.SegmentFlushMeta
|
||||
collID2DdlMeta map[UniqueID]*datapb.DDLFlushMeta // GOOSE TODO: addDDLFlush and has DDLFlush
|
||||
collID2DdlMeta map[UniqueID]*datapb.DDLFlushMeta
|
||||
|
||||
lock sync.RWMutex
|
||||
}
|
||||
|
@ -36,24 +36,6 @@ func NewMetaTable(kv kv.TxnBase) (*metaTable, error) {
|
|||
return mt, nil
|
||||
}
|
||||
|
||||
func (mt *metaTable) AppendDDLBinlogPaths(collID UniqueID, paths []string) error {
|
||||
mt.lock.Lock()
|
||||
defer mt.lock.Unlock()
|
||||
|
||||
_, ok := mt.collID2DdlMeta[collID]
|
||||
if !ok {
|
||||
mt.collID2DdlMeta[collID] = &datapb.DDLFlushMeta{
|
||||
CollectionID: collID,
|
||||
BinlogPaths: make([]string, 0),
|
||||
}
|
||||
}
|
||||
|
||||
meta := mt.collID2DdlMeta[collID]
|
||||
meta.BinlogPaths = append(meta.BinlogPaths, paths...)
|
||||
|
||||
return mt.saveDDLFlushMeta(meta)
|
||||
}
|
||||
|
||||
func (mt *metaTable) AppendSegBinlogPaths(segmentID UniqueID, fieldID int64, dataPaths []string) error {
|
||||
_, ok := mt.segID2FlushMeta[segmentID]
|
||||
if !ok {
|
||||
|
@ -97,44 +79,6 @@ func (mt *metaTable) CompleteFlush(segmentID UniqueID) error {
|
|||
return mt.saveSegFlushMeta(meta)
|
||||
}
|
||||
|
||||
// metaTable.lock.Lock() before call this function
|
||||
func (mt *metaTable) saveDDLFlushMeta(meta *datapb.DDLFlushMeta) error {
|
||||
value := proto.MarshalTextString(meta)
|
||||
|
||||
mt.collID2DdlMeta[meta.CollectionID] = meta
|
||||
prefix := path.Join(Params.DDLFlushMetaSubPath, strconv.FormatInt(meta.CollectionID, 10))
|
||||
|
||||
return mt.client.Save(prefix, value)
|
||||
}
|
||||
|
||||
func (mt *metaTable) reloadDdlMetaFromKV() error {
|
||||
mt.collID2DdlMeta = make(map[UniqueID]*datapb.DDLFlushMeta)
|
||||
_, values, err := mt.client.LoadWithPrefix(Params.DDLFlushMetaSubPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, value := range values {
|
||||
ddlMeta := &datapb.DDLFlushMeta{}
|
||||
err = proto.UnmarshalText(value, ddlMeta)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mt.collID2DdlMeta[ddlMeta.CollectionID] = ddlMeta
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// metaTable.lock.Lock() before call this function
|
||||
func (mt *metaTable) saveSegFlushMeta(meta *datapb.SegmentFlushMeta) error {
|
||||
value := proto.MarshalTextString(meta)
|
||||
|
||||
mt.segID2FlushMeta[meta.SegmentID] = meta
|
||||
prefix := path.Join(Params.SegFlushMetaSubPath, strconv.FormatInt(meta.SegmentID, 10))
|
||||
|
||||
return mt.client.Save(prefix, value)
|
||||
}
|
||||
|
||||
func (mt *metaTable) reloadSegMetaFromKV() error {
|
||||
mt.segID2FlushMeta = make(map[UniqueID]*datapb.SegmentFlushMeta)
|
||||
|
||||
|
@ -155,6 +99,16 @@ func (mt *metaTable) reloadSegMetaFromKV() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// metaTable.lock.Lock() before call this function
|
||||
func (mt *metaTable) saveSegFlushMeta(meta *datapb.SegmentFlushMeta) error {
|
||||
value := proto.MarshalTextString(meta)
|
||||
|
||||
mt.segID2FlushMeta[meta.SegmentID] = meta
|
||||
prefix := path.Join(Params.SegFlushMetaSubPath, strconv.FormatInt(meta.SegmentID, 10))
|
||||
|
||||
return mt.client.Save(prefix, value)
|
||||
}
|
||||
|
||||
func (mt *metaTable) addSegmentFlush(segmentID UniqueID) error {
|
||||
mt.lock.Lock()
|
||||
defer mt.lock.Unlock()
|
||||
|
@ -197,6 +151,61 @@ func (mt *metaTable) getSegBinlogPaths(segmentID UniqueID) (map[int64][]string,
|
|||
return ret, nil
|
||||
}
|
||||
|
||||
// --- DDL ---
|
||||
func (mt *metaTable) AppendDDLBinlogPaths(collID UniqueID, paths []string) error {
|
||||
mt.lock.Lock()
|
||||
defer mt.lock.Unlock()
|
||||
|
||||
_, ok := mt.collID2DdlMeta[collID]
|
||||
if !ok {
|
||||
mt.collID2DdlMeta[collID] = &datapb.DDLFlushMeta{
|
||||
CollectionID: collID,
|
||||
BinlogPaths: make([]string, 0),
|
||||
}
|
||||
}
|
||||
|
||||
meta := mt.collID2DdlMeta[collID]
|
||||
meta.BinlogPaths = append(meta.BinlogPaths, paths...)
|
||||
|
||||
return mt.saveDDLFlushMeta(meta)
|
||||
}
|
||||
|
||||
func (mt *metaTable) hasDDLFlushMeta(collID UniqueID) bool {
|
||||
mt.lock.RLock()
|
||||
defer mt.lock.RUnlock()
|
||||
|
||||
_, ok := mt.collID2DdlMeta[collID]
|
||||
return ok
|
||||
}
|
||||
|
||||
// metaTable.lock.Lock() before call this function
|
||||
func (mt *metaTable) saveDDLFlushMeta(meta *datapb.DDLFlushMeta) error {
|
||||
value := proto.MarshalTextString(meta)
|
||||
|
||||
mt.collID2DdlMeta[meta.CollectionID] = meta
|
||||
prefix := path.Join(Params.DDLFlushMetaSubPath, strconv.FormatInt(meta.CollectionID, 10))
|
||||
|
||||
return mt.client.Save(prefix, value)
|
||||
}
|
||||
|
||||
func (mt *metaTable) reloadDdlMetaFromKV() error {
|
||||
mt.collID2DdlMeta = make(map[UniqueID]*datapb.DDLFlushMeta)
|
||||
_, values, err := mt.client.LoadWithPrefix(Params.DDLFlushMetaSubPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, value := range values {
|
||||
ddlMeta := &datapb.DDLFlushMeta{}
|
||||
err = proto.UnmarshalText(value, ddlMeta)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mt.collID2DdlMeta[ddlMeta.CollectionID] = ddlMeta
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mt *metaTable) getDDLBinlogPaths(collID UniqueID) (map[UniqueID][]string, error) {
|
||||
mt.lock.RLock()
|
||||
defer mt.lock.RUnlock()
|
||||
|
|
|
@ -1,26 +1,16 @@
|
|||
package datanode
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
|
||||
"go.etcd.io/etcd/clientv3"
|
||||
memkv "github.com/zilliztech/milvus-distributed/internal/kv/mem"
|
||||
)
|
||||
|
||||
func TestMetaTable_all(t *testing.T) {
|
||||
func TestMetaTable_SegmentFlush(t *testing.T) {
|
||||
|
||||
etcdAddr := Params.EtcdAddress
|
||||
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
|
||||
require.NoError(t, err)
|
||||
etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/meta/root")
|
||||
|
||||
_, err = cli.Delete(context.TODO(), "/etcd/test/meta/root", clientv3.WithPrefix())
|
||||
require.NoError(t, err)
|
||||
|
||||
meta, err := NewMetaTable(etcdKV)
|
||||
kvMock := memkv.NewMemoryKV()
|
||||
meta, err := NewMetaTable(kvMock)
|
||||
assert.NoError(t, err)
|
||||
defer meta.client.Close()
|
||||
|
||||
|
@ -65,27 +55,6 @@ func TestMetaTable_all(t *testing.T) {
|
|||
ret)
|
||||
})
|
||||
|
||||
t.Run("TestMetaTable_AppendDDLBinlogPaths", func(t *testing.T) {
|
||||
|
||||
collID2Paths := map[UniqueID][]string{
|
||||
301: {"a", "b", "c"},
|
||||
302: {"c", "b", "a"},
|
||||
}
|
||||
|
||||
for collID, dataPaths := range collID2Paths {
|
||||
for _, dp := range dataPaths {
|
||||
err = meta.AppendDDLBinlogPaths(collID, []string{dp})
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
for k, v := range collID2Paths {
|
||||
ret, err := meta.getDDLBinlogPaths(k)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, map[UniqueID][]string{k: v}, ret)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("TestMetaTable_CompleteFlush", func(t *testing.T) {
|
||||
|
||||
var segmentID UniqueID = 401
|
||||
|
@ -105,3 +74,37 @@ func TestMetaTable_all(t *testing.T) {
|
|||
})
|
||||
|
||||
}
|
||||
|
||||
func TestMetaTable_DDLFlush(t *testing.T) {
|
||||
kvMock := memkv.NewMemoryKV()
|
||||
meta, err := NewMetaTable(kvMock)
|
||||
assert.NoError(t, err)
|
||||
defer meta.client.Close()
|
||||
|
||||
t.Run("TestMetaTable_AppendDDLBinlogPaths", func(t *testing.T) {
|
||||
|
||||
assert.False(t, meta.hasDDLFlushMeta(301))
|
||||
assert.False(t, meta.hasDDLFlushMeta(302))
|
||||
|
||||
collID2Paths := map[UniqueID][]string{
|
||||
301: {"a", "b", "c"},
|
||||
302: {"c", "b", "a"},
|
||||
}
|
||||
|
||||
for collID, dataPaths := range collID2Paths {
|
||||
for _, dp := range dataPaths {
|
||||
err = meta.AppendDDLBinlogPaths(collID, []string{dp})
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
for k, v := range collID2Paths {
|
||||
ret, err := meta.getDDLBinlogPaths(k)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, map[UniqueID][]string{k: v}, ret)
|
||||
}
|
||||
|
||||
assert.True(t, meta.hasDDLFlushMeta(301))
|
||||
assert.True(t, meta.hasDDLFlushMeta(302))
|
||||
})
|
||||
}
|
||||
|
|
|
@ -123,7 +123,7 @@ func (s *Server) init() error {
|
|||
}()
|
||||
|
||||
s.wg.Add(1)
|
||||
s.startGrpcLoop(Params.Port)
|
||||
go s.startGrpcLoop(Params.Port)
|
||||
// wait for grpc server loop start
|
||||
err = <-s.grpcErrChan
|
||||
if err != nil {
|
||||
|
|
|
@ -59,7 +59,7 @@ func (s *Server) init() error {
|
|||
proxyservice.Params.Init()
|
||||
|
||||
s.wg.Add(1)
|
||||
s.startGrpcLoop(Params.ServicePort)
|
||||
go s.startGrpcLoop(Params.ServicePort)
|
||||
// wait for grpc server loop start
|
||||
if err := <-s.grpcErrChan; err != nil {
|
||||
return err
|
||||
|
|
|
@ -247,7 +247,9 @@ func (c *Core) checkInit() error {
|
|||
if c.DataNodeSegmentFlushCompletedChan == nil {
|
||||
return errors.Errorf("DataNodeSegmentFlushCompletedChan is nil")
|
||||
}
|
||||
log.Printf("master node id = %d\n", Params.NodeID)
|
||||
log.Printf("master node id = %d", Params.NodeID)
|
||||
log.Printf("master dd channel name = %s", Params.DdChannel)
|
||||
log.Printf("master time ticke channel name = %s", Params.TimeTickChannel)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -607,6 +609,7 @@ func (c *Core) SetProxyService(s ProxyServiceInterface) error {
|
|||
return err
|
||||
}
|
||||
Params.ProxyTimeTickChannel = rsp
|
||||
log.Printf("proxy time tick channel name = %s", Params.ProxyTimeTickChannel)
|
||||
|
||||
c.InvalidateCollectionMetaCache = func(ts typeutil.Timestamp, dbName string, collectionName string) error {
|
||||
err := s.InvalidateCollectionMetaCache(&proxypb.InvalidateCollMetaCacheRequest{
|
||||
|
@ -633,6 +636,8 @@ func (c *Core) SetDataService(s DataServiceInterface) error {
|
|||
return err
|
||||
}
|
||||
Params.DataServiceSegmentChannel = rsp
|
||||
log.Printf("data service segment channel name = %s", Params.DataServiceSegmentChannel)
|
||||
|
||||
c.GetBinlogFilePathsFromDataServiceReq = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) {
|
||||
ts, err := c.tsoAllocator.Alloc(1)
|
||||
if err != nil {
|
||||
|
|
|
@ -39,8 +39,9 @@ func (s *ServiceImpl) fillNodeInitParams() error {
|
|||
|
||||
getConfigContentByName := func(fileName string) []byte {
|
||||
_, fpath, _, _ := runtime.Caller(0)
|
||||
configFile := path.Dir(fpath) + "/../../../configs/" + fileName
|
||||
configFile := path.Dir(fpath) + "/../../configs/" + fileName
|
||||
_, err := os.Stat(configFile)
|
||||
log.Printf("configFile = %s", configFile)
|
||||
if os.IsNotExist(err) {
|
||||
runPath, err := os.Getwd()
|
||||
if err != nil {
|
||||
|
|
|
@ -18,7 +18,7 @@ go test -race -cover "${MILVUS_DIR}/kv/..." -failfast
|
|||
# TODO: remove to distributed
|
||||
#go test -race -cover "${MILVUS_DIR}/proxynode/..." -failfast
|
||||
#go test -race -cover "${MILVUS_DIR}/writenode/..." -failfast
|
||||
#go test -race -cover "${MILVUS_DIR}/datanode/..." -failfast
|
||||
go test -race -cover "${MILVUS_DIR}/datanode/..." -failfast
|
||||
#go test -race -cover "${MILVUS_DIR}/master/..." -failfast
|
||||
#go test -race -cover "${MILVUS_DIR}/indexnode/..." -failfast
|
||||
#go test -race -cover "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/querynode/..." "${MILVUS_DIR}/storage" "${MILVUS_DIR}/util/..." -failfast
|
||||
|
|
Loading…
Reference in New Issue