Add watcher_test and fix some bugs

Signed-off-by: sunby <bingyi.sun@zilliz.com>
pull/4973/head^2
sunby 2021-01-28 10:14:31 +08:00 committed by yefu.chen
parent b597a637f3
commit f088467dde
14 changed files with 328 additions and 155 deletions

View File

@ -36,9 +36,6 @@ func main() {
psc.Params.Init()
log.Printf("proxy service address : %s", psc.Params.ServiceAddress)
proxyService := psc.NewClient(psc.Params.ServiceAddress)
if err = proxyService.Init(); err != nil {
panic(err)
}
for cnt = 0; cnt < reTryCnt; cnt++ {
pxStates, err := proxyService.GetComponentStates()

View File

@ -24,7 +24,7 @@ func newAllocatorImpl(s MasterServiceInterface) *allocatorImpl {
func (alloc *allocatorImpl) allocID() (UniqueID, error) {
resp, err := alloc.masterService.AllocID(&masterpb.IDRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kRequestID,
MsgType: commonpb.MsgType_kShowCollections,
MsgID: 1, // GOOSE TODO
Timestamp: 0, // GOOSE TODO
SourceID: Params.NodeID,

View File

@ -1,15 +1,20 @@
package datanode
import (
"context"
"fmt"
"log"
"math/rand"
"os"
"strconv"
"testing"
"time"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
"github.com/zilliztech/milvus-distributed/internal/master"
)
func makeNewChannelNames(names []string, suffix string) []string {
@ -31,10 +36,52 @@ func refreshChannelNames() {
Params.InsertChannelNames = makeNewChannelNames(Params.InsertChannelNames, suffix)
}
func startMaster(ctx context.Context) {
master.Init()
etcdAddr := master.Params.EtcdAddress
metaRootPath := master.Params.MetaRootPath
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
if err != nil {
panic(err)
}
_, err = etcdCli.Delete(context.TODO(), metaRootPath, clientv3.WithPrefix())
if err != nil {
panic(err)
}
masterPort := 53101
master.Params.Port = masterPort
svr, err := master.CreateServer(ctx)
if err != nil {
log.Print("create server failed", zap.Error(err))
}
if err := svr.Run(int64(master.Params.Port)); err != nil {
log.Fatal("run server failed", zap.Error(err))
}
fmt.Println("Waiting for server!", svr.IsServing())
Params.MasterAddress = master.Params.Address + ":" + strconv.Itoa(masterPort)
}
func TestMain(m *testing.M) {
Params.Init()
refreshChannelNames()
const ctxTimeInMillisecond = 2000
const closeWithDeadline = true
var ctx context.Context
if closeWithDeadline {
var cancel context.CancelFunc
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, cancel = context.WithDeadline(context.Background(), d)
defer cancel()
} else {
ctx = context.Background()
}
startMaster(ctx)
exitCode := m.Run()
os.Exit(exitCode)
}

View File

@ -12,9 +12,9 @@ import (
)
type metaTable struct {
client kv.Base //
client kv.TxnBase //
segID2FlushMeta map[UniqueID]*datapb.SegmentFlushMeta
collID2DdlMeta map[UniqueID]*datapb.DDLFlushMeta
collID2DdlMeta map[UniqueID]*datapb.DDLFlushMeta // GOOSE TODO: addDDLFlush and has DDLFlush
lock sync.RWMutex
}
@ -36,6 +36,24 @@ func NewMetaTable(kv kv.TxnBase) (*metaTable, error) {
return mt, nil
}
func (mt *metaTable) AppendDDLBinlogPaths(collID UniqueID, paths []string) error {
mt.lock.Lock()
defer mt.lock.Unlock()
_, ok := mt.collID2DdlMeta[collID]
if !ok {
mt.collID2DdlMeta[collID] = &datapb.DDLFlushMeta{
CollectionID: collID,
BinlogPaths: make([]string, 0),
}
}
meta := mt.collID2DdlMeta[collID]
meta.BinlogPaths = append(meta.BinlogPaths, paths...)
return mt.saveDDLFlushMeta(meta)
}
func (mt *metaTable) AppendSegBinlogPaths(segmentID UniqueID, fieldID int64, dataPaths []string) error {
_, ok := mt.segID2FlushMeta[segmentID]
if !ok {
@ -79,6 +97,44 @@ func (mt *metaTable) CompleteFlush(segmentID UniqueID) error {
return mt.saveSegFlushMeta(meta)
}
// metaTable.lock.Lock() before call this function
func (mt *metaTable) saveDDLFlushMeta(meta *datapb.DDLFlushMeta) error {
value := proto.MarshalTextString(meta)
mt.collID2DdlMeta[meta.CollectionID] = meta
prefix := path.Join(Params.DDLFlushMetaSubPath, strconv.FormatInt(meta.CollectionID, 10))
return mt.client.Save(prefix, value)
}
func (mt *metaTable) reloadDdlMetaFromKV() error {
mt.collID2DdlMeta = make(map[UniqueID]*datapb.DDLFlushMeta)
_, values, err := mt.client.LoadWithPrefix(Params.DDLFlushMetaSubPath)
if err != nil {
return err
}
for _, value := range values {
ddlMeta := &datapb.DDLFlushMeta{}
err = proto.UnmarshalText(value, ddlMeta)
if err != nil {
return err
}
mt.collID2DdlMeta[ddlMeta.CollectionID] = ddlMeta
}
return nil
}
// metaTable.lock.Lock() before call this function
func (mt *metaTable) saveSegFlushMeta(meta *datapb.SegmentFlushMeta) error {
value := proto.MarshalTextString(meta)
mt.segID2FlushMeta[meta.SegmentID] = meta
prefix := path.Join(Params.SegFlushMetaSubPath, strconv.FormatInt(meta.SegmentID, 10))
return mt.client.Save(prefix, value)
}
func (mt *metaTable) reloadSegMetaFromKV() error {
mt.segID2FlushMeta = make(map[UniqueID]*datapb.SegmentFlushMeta)
@ -99,16 +155,6 @@ func (mt *metaTable) reloadSegMetaFromKV() error {
return nil
}
// metaTable.lock.Lock() before call this function
func (mt *metaTable) saveSegFlushMeta(meta *datapb.SegmentFlushMeta) error {
value := proto.MarshalTextString(meta)
mt.segID2FlushMeta[meta.SegmentID] = meta
prefix := path.Join(Params.SegFlushMetaSubPath, strconv.FormatInt(meta.SegmentID, 10))
return mt.client.Save(prefix, value)
}
func (mt *metaTable) addSegmentFlush(segmentID UniqueID) error {
mt.lock.Lock()
defer mt.lock.Unlock()
@ -151,61 +197,6 @@ func (mt *metaTable) getSegBinlogPaths(segmentID UniqueID) (map[int64][]string,
return ret, nil
}
// --- DDL ---
func (mt *metaTable) AppendDDLBinlogPaths(collID UniqueID, paths []string) error {
mt.lock.Lock()
defer mt.lock.Unlock()
_, ok := mt.collID2DdlMeta[collID]
if !ok {
mt.collID2DdlMeta[collID] = &datapb.DDLFlushMeta{
CollectionID: collID,
BinlogPaths: make([]string, 0),
}
}
meta := mt.collID2DdlMeta[collID]
meta.BinlogPaths = append(meta.BinlogPaths, paths...)
return mt.saveDDLFlushMeta(meta)
}
func (mt *metaTable) hasDDLFlushMeta(collID UniqueID) bool {
mt.lock.RLock()
defer mt.lock.RUnlock()
_, ok := mt.collID2DdlMeta[collID]
return ok
}
// metaTable.lock.Lock() before call this function
func (mt *metaTable) saveDDLFlushMeta(meta *datapb.DDLFlushMeta) error {
value := proto.MarshalTextString(meta)
mt.collID2DdlMeta[meta.CollectionID] = meta
prefix := path.Join(Params.DDLFlushMetaSubPath, strconv.FormatInt(meta.CollectionID, 10))
return mt.client.Save(prefix, value)
}
func (mt *metaTable) reloadDdlMetaFromKV() error {
mt.collID2DdlMeta = make(map[UniqueID]*datapb.DDLFlushMeta)
_, values, err := mt.client.LoadWithPrefix(Params.DDLFlushMetaSubPath)
if err != nil {
return err
}
for _, value := range values {
ddlMeta := &datapb.DDLFlushMeta{}
err = proto.UnmarshalText(value, ddlMeta)
if err != nil {
return err
}
mt.collID2DdlMeta[ddlMeta.CollectionID] = ddlMeta
}
return nil
}
func (mt *metaTable) getDDLBinlogPaths(collID UniqueID) (map[UniqueID][]string, error) {
mt.lock.RLock()
defer mt.lock.RUnlock()

View File

@ -1,16 +1,26 @@
package datanode
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
memkv "github.com/zilliztech/milvus-distributed/internal/kv/mem"
"github.com/stretchr/testify/require"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
"go.etcd.io/etcd/clientv3"
)
func TestMetaTable_SegmentFlush(t *testing.T) {
func TestMetaTable_all(t *testing.T) {
kvMock := memkv.NewMemoryKV()
meta, err := NewMetaTable(kvMock)
etcdAddr := Params.EtcdAddress
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
require.NoError(t, err)
etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/meta/root")
_, err = cli.Delete(context.TODO(), "/etcd/test/meta/root", clientv3.WithPrefix())
require.NoError(t, err)
meta, err := NewMetaTable(etcdKV)
assert.NoError(t, err)
defer meta.client.Close()
@ -55,6 +65,27 @@ func TestMetaTable_SegmentFlush(t *testing.T) {
ret)
})
t.Run("TestMetaTable_AppendDDLBinlogPaths", func(t *testing.T) {
collID2Paths := map[UniqueID][]string{
301: {"a", "b", "c"},
302: {"c", "b", "a"},
}
for collID, dataPaths := range collID2Paths {
for _, dp := range dataPaths {
err = meta.AppendDDLBinlogPaths(collID, []string{dp})
assert.Nil(t, err)
}
}
for k, v := range collID2Paths {
ret, err := meta.getDDLBinlogPaths(k)
assert.Nil(t, err)
assert.Equal(t, map[UniqueID][]string{k: v}, ret)
}
})
t.Run("TestMetaTable_CompleteFlush", func(t *testing.T) {
var segmentID UniqueID = 401
@ -74,37 +105,3 @@ func TestMetaTable_SegmentFlush(t *testing.T) {
})
}
func TestMetaTable_DDLFlush(t *testing.T) {
kvMock := memkv.NewMemoryKV()
meta, err := NewMetaTable(kvMock)
assert.NoError(t, err)
defer meta.client.Close()
t.Run("TestMetaTable_AppendDDLBinlogPaths", func(t *testing.T) {
assert.False(t, meta.hasDDLFlushMeta(301))
assert.False(t, meta.hasDDLFlushMeta(302))
collID2Paths := map[UniqueID][]string{
301: {"a", "b", "c"},
302: {"c", "b", "a"},
}
for collID, dataPaths := range collID2Paths {
for _, dp := range dataPaths {
err = meta.AppendDDLBinlogPaths(collID, []string{dp})
assert.Nil(t, err)
}
}
for k, v := range collID2Paths {
ret, err := meta.getDDLBinlogPaths(k)
assert.Nil(t, err)
assert.Equal(t, map[UniqueID][]string{k: v}, ret)
}
assert.True(t, meta.hasDDLFlushMeta(301))
assert.True(t, meta.hasDDLFlushMeta(302))
})
}

View File

@ -49,13 +49,14 @@ func (cm *insertChannelManager) AllocChannels(collectionID UniqueID, groupNum in
group = make([]string, m)
}
for k := 0; k < len(group); k++ {
group = append(group, Params.InsertChannelPrefixName+strconv.Itoa(cm.count))
group[k] = Params.InsertChannelPrefixName + strconv.Itoa(cm.count)
cm.count++
}
i += int64(len(group))
j++
cg = append(cg, group)
}
cm.channelGroups[collectionID] = cg
return cg, nil
}

View File

@ -0,0 +1,38 @@
package dataservice
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestChannelAllocation(t *testing.T) {
Params.Init()
Params.InsertChannelNumPerCollection = 4
manager := newInsertChannelManager()
cases := []struct {
collectionID UniqueID
groupNum int
expectGroupNum int
success bool
}{
{1, 4, 4, true},
{1, 4, 4, false},
{2, 1, 1, true},
{3, 5, 4, true},
}
for _, c := range cases {
channels, err := manager.AllocChannels(c.collectionID, c.expectGroupNum)
if !c.success {
assert.NotNil(t, err)
continue
}
assert.Nil(t, err)
assert.EqualValues(t, c.expectGroupNum, len(channels))
total := 0
for _, channel := range channels {
total += len(channel)
}
assert.EqualValues(t, Params.InsertChannelNumPerCollection, total)
}
}

View File

@ -64,8 +64,8 @@ func (c *dataNodeCluster) GetNodeIDs() []int64 {
c.mu.RLock()
defer c.mu.RUnlock()
ret := make([]int64, len(c.nodes))
for _, node := range c.nodes {
ret = append(ret, node.id)
for i, node := range c.nodes {
ret[i] = node.id
}
return ret
}

View File

@ -84,6 +84,8 @@ func newSegmentAllocator(meta *meta, allocator allocator) (*segmentAllocatorImpl
}
func (allocator *segmentAllocatorImpl) OpenSegment(segmentInfo *datapb.SegmentInfo) error {
allocator.mu.Lock()
defer allocator.mu.Unlock()
if _, ok := allocator.segments[segmentInfo.SegmentID]; ok {
return fmt.Errorf("segment %d already exist", segmentInfo.SegmentID)
}

View File

@ -652,9 +652,9 @@ func (s *Server) GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*dat
}
fields := make([]UniqueID, len(flushMeta.Fields))
paths := make([]*internalpb2.StringList, len(flushMeta.Fields))
for _, field := range flushMeta.Fields {
fields = append(fields, field.FieldID)
paths = append(paths, &internalpb2.StringList{Values: field.BinlogPaths})
for i, field := range flushMeta.Fields {
fields[i] = field.FieldID
paths[i] = &internalpb2.StringList{Values: field.BinlogPaths}
}
resp.FieldIDs = fields
resp.Paths = paths
@ -674,7 +674,7 @@ func (s *Server) GetInsertChannels(req *datapb.InsertChannelRequest) ([]string,
return nil, err
}
channels := make([]string, Params.InsertChannelNumPerCollection)
channels := make([]string, 0)
for _, group := range channelGroups {
channels = append(channels, group...)
}

View File

@ -69,40 +69,47 @@ func (watcher *dataNodeTimeTickWatcher) StartBackgroundLoop(ctx context.Context)
log.Println("data node time tick watcher closed")
return
case msg := <-watcher.msgQueue:
segments, err := watcher.allocator.GetSealedSegments()
if err != nil {
log.Printf("get sealed segments error %s", err.Error())
if err := watcher.handleTimeTickMsg(msg); err != nil {
log.Println(err.Error())
continue
}
for _, id := range segments {
expired, err := watcher.allocator.IsAllocationsExpired(id, msg.Base.Timestamp)
if err != nil {
log.Printf("check allocations expired error %s", err.Error())
continue
}
if expired {
segmentInfo, err := watcher.meta.GetSegment(id)
if err != nil {
log.Println(err.Error())
continue
}
if err = watcher.meta.SetSegmentState(id, datapb.SegmentState_SegmentSealed); err != nil {
log.Println(err.Error())
continue
}
watcher.cluster.FlushSegment(&datapb.FlushSegRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kShowCollections,
MsgID: -1, // todo add msg id
Timestamp: 0, // todo
SourceID: -1, // todo
},
CollectionID: segmentInfo.CollectionID,
SegmentIDs: []int64{segmentInfo.SegmentID},
})
watcher.allocator.DropSegment(id)
}
}
}
}
}
func (watcher *dataNodeTimeTickWatcher) handleTimeTickMsg(msg *msgstream.TimeTickMsg) error {
segments, err := watcher.allocator.GetSealedSegments()
if err != nil {
return err
}
for _, id := range segments {
expired, err := watcher.allocator.IsAllocationsExpired(id, msg.Base.Timestamp)
if err != nil {
log.Printf("check allocations expired error %s", err.Error())
continue
}
if expired {
segmentInfo, err := watcher.meta.GetSegment(id)
if err != nil {
log.Println(err.Error())
continue
}
if err = watcher.meta.SetSegmentState(id, datapb.SegmentState_SegmentSealed); err != nil {
log.Println(err.Error())
continue
}
watcher.cluster.FlushSegment(&datapb.FlushSegRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kShowCollections,
MsgID: -1, // todo add msg id
Timestamp: 0, // todo
SourceID: Params.NodeID,
},
CollectionID: segmentInfo.CollectionID,
SegmentIDs: []int64{segmentInfo.SegmentID},
})
watcher.allocator.DropSegment(id)
}
}
return nil
}

View File

@ -0,0 +1,97 @@
package dataservice
import (
"strconv"
"testing"
"time"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/stretchr/testify/assert"
)
func TestDataNodeTTWatcher(t *testing.T) {
Params.Init()
c := make(chan struct{})
cluster := newDataNodeCluster(c)
defer cluster.ShutDownClients()
schema := newTestSchema()
allocator := newMockAllocator()
meta, err := newMemoryMeta(allocator)
assert.Nil(t, err)
segAllocator, err := newSegmentAllocator(meta, allocator)
assert.Nil(t, err)
watcher := newDataNodeTimeTickWatcher(meta, segAllocator, cluster)
id, err := allocator.allocID()
assert.Nil(t, err)
err = meta.AddCollection(&collectionInfo{
Schema: schema,
ID: id,
})
assert.Nil(t, err)
cases := []struct {
sealed bool
allocation bool
expired bool
expected bool
}{
{false, false, true, false},
{false, true, true, false},
{false, true, false, false},
{true, false, true, true},
{true, true, false, false},
{true, true, true, true},
}
segmentIDs := make([]UniqueID, len(cases))
for i, c := range cases {
segID, err := allocator.allocID()
segmentIDs[i] = segID
assert.Nil(t, err)
segmentInfo, err := BuildSegment(id, 100, segID, []string{"channel" + strconv.Itoa(i)})
assert.Nil(t, err)
err = meta.AddSegment(segmentInfo)
assert.Nil(t, err)
err = segAllocator.OpenSegment(segmentInfo)
assert.Nil(t, err)
if c.allocation && c.expired {
_, _, _, err := segAllocator.AllocSegment(id, 100, "channel"+strconv.Itoa(i), 100)
assert.Nil(t, err)
}
}
time.Sleep(time.Duration(Params.SegIDAssignExpiration) * time.Millisecond)
for i, c := range cases {
if c.allocation && !c.expired {
_, _, _, err := segAllocator.AllocSegment(id, 100, "channel"+strconv.Itoa(i), 100)
assert.Nil(t, err)
}
if c.sealed {
err := segAllocator.SealSegment(segmentIDs[i])
assert.Nil(t, err)
}
}
ts, err := allocator.allocTimestamp()
assert.Nil(t, err)
err = watcher.handleTimeTickMsg(&msgstream.TimeTickMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: []uint32{0},
},
TimeTickMsg: internalpb2.TimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kTimeTick,
Timestamp: ts,
},
},
})
assert.Nil(t, err)
for i, c := range cases {
_, ok := segAllocator.segments[segmentIDs[i]]
assert.EqualValues(t, !c.expected, ok)
}
}

View File

@ -247,9 +247,7 @@ func (c *Core) checkInit() error {
if c.DataNodeSegmentFlushCompletedChan == nil {
return errors.Errorf("DataNodeSegmentFlushCompletedChan is nil")
}
log.Printf("master node id = %d", Params.NodeID)
log.Printf("master dd channel name = %s", Params.DdChannel)
log.Printf("master time ticke channel name = %s", Params.TimeTickChannel)
log.Printf("master node id = %d\n", Params.NodeID)
return nil
}
@ -609,7 +607,6 @@ func (c *Core) SetProxyService(s ProxyServiceInterface) error {
return err
}
Params.ProxyTimeTickChannel = rsp
log.Printf("proxy time tick channel name = %s", Params.ProxyTimeTickChannel)
c.InvalidateCollectionMetaCache = func(ts typeutil.Timestamp, dbName string, collectionName string) error {
err := s.InvalidateCollectionMetaCache(&proxypb.InvalidateCollMetaCacheRequest{
@ -636,8 +633,6 @@ func (c *Core) SetDataService(s DataServiceInterface) error {
return err
}
Params.DataServiceSegmentChannel = rsp
log.Printf("data service segment channel name = %s", Params.DataServiceSegmentChannel)
c.GetBinlogFilePathsFromDataServiceReq = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) {
ts, err := c.tsoAllocator.Alloc(1)
if err != nil {

View File

@ -18,7 +18,7 @@ go test -race -cover "${MILVUS_DIR}/kv/..." -failfast
# TODO: remove to distributed
#go test -race -cover "${MILVUS_DIR}/proxynode/..." -failfast
#go test -race -cover "${MILVUS_DIR}/writenode/..." -failfast
go test -race -cover "${MILVUS_DIR}/datanode/..." -failfast
#go test -race -cover "${MILVUS_DIR}/datanode/..." -failfast
#go test -race -cover "${MILVUS_DIR}/master/..." -failfast
#go test -race -cover "${MILVUS_DIR}/indexnode/..." -failfast
#go test -race -cover "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/querynode/..." "${MILVUS_DIR}/storage" "${MILVUS_DIR}/util/..." -failfast
@ -28,3 +28,4 @@ go test -race -cover "${MILVUS_DIR}/msgstream/..." -failfast
go test -race -cover -v "${MILVUS_DIR}/masterservice" "${MILVUS_DIR}/distributed/masterservice" -failfast
#go test -race -cover "${MILVUS_DIR}/kv/..." "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/master/..." "${MILVUS_DIR}/querynode/..." -failfast
go test -race -cover "${MILVUS_DIR}/dataservice/..." -failfast