mirror of https://github.com/milvus-io/milvus.git
parent
d9d2f33a23
commit
6ffe8739bf
|
@ -126,7 +126,7 @@ func TestSegmentManager_AssignSegment(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
time.Sleep(time.Duration(Params.SegIDAssignExpiration))
|
||||
time.Sleep(time.Duration(Params.SegIDAssignExpiration) * time.Millisecond)
|
||||
timestamp, err := globalTsoAllocator()
|
||||
assert.Nil(t, err)
|
||||
err = mt.UpdateSegment(&pb.SegmentMeta{
|
||||
|
@ -156,3 +156,122 @@ func TestSegmentManager_AssignSegment(t *testing.T) {
|
|||
assert.Nil(t, err)
|
||||
assert.NotEqualValues(t, 0, segMeta.CloseTime)
|
||||
}
|
||||
|
||||
func TestSegmentManager_SycnWritenode(t *testing.T) {
|
||||
ctx, cancelFunc := context.WithCancel(context.TODO())
|
||||
defer cancelFunc()
|
||||
|
||||
Init()
|
||||
Params.TopicNum = 5
|
||||
Params.QueryNodeNum = 3
|
||||
Params.SegmentSize = 536870912 / 1024 / 1024
|
||||
Params.SegmentSizeFactor = 0.75
|
||||
Params.DefaultRecordSize = 1024
|
||||
Params.MinSegIDAssignCnt = 1048576 / 1024
|
||||
Params.SegIDAssignExpiration = 2000
|
||||
etcdAddress := Params.EtcdAddress
|
||||
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}})
|
||||
assert.Nil(t, err)
|
||||
rootPath := "/test/root"
|
||||
_, err = cli.Delete(ctx, rootPath, clientv3.WithPrefix())
|
||||
assert.Nil(t, err)
|
||||
|
||||
kvBase := etcdkv.NewEtcdKV(cli, rootPath)
|
||||
defer kvBase.Close()
|
||||
mt, err := NewMetaTable(kvBase)
|
||||
assert.Nil(t, err)
|
||||
|
||||
collName := "segmgr_test_coll"
|
||||
var collID int64 = 1001
|
||||
partitionTag := "test_part"
|
||||
schema := &schemapb.CollectionSchema{
|
||||
Name: collName,
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{FieldID: 1, Name: "f1", IsPrimaryKey: false, DataType: schemapb.DataType_INT32},
|
||||
{FieldID: 2, Name: "f2", IsPrimaryKey: false, DataType: schemapb.DataType_VECTOR_FLOAT, TypeParams: []*commonpb.KeyValuePair{
|
||||
{Key: "dim", Value: "128"},
|
||||
}},
|
||||
},
|
||||
}
|
||||
err = mt.AddCollection(&pb.CollectionMeta{
|
||||
ID: collID,
|
||||
Schema: schema,
|
||||
CreateTime: 0,
|
||||
SegmentIDs: []UniqueID{},
|
||||
PartitionTags: []string{},
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
err = mt.AddPartition(collID, partitionTag)
|
||||
assert.Nil(t, err)
|
||||
|
||||
var cnt int64
|
||||
globalIDAllocator := func() (UniqueID, error) {
|
||||
val := atomic.AddInt64(&cnt, 1)
|
||||
return val, nil
|
||||
}
|
||||
globalTsoAllocator := func() (Timestamp, error) {
|
||||
val := atomic.AddInt64(&cnt, 1)
|
||||
phy := time.Now().UnixNano() / int64(time.Millisecond)
|
||||
ts := tsoutil.ComposeTS(phy, val)
|
||||
return ts, nil
|
||||
}
|
||||
syncWriteChan := make(chan *msgstream.TimeTickMsg)
|
||||
syncProxyChan := make(chan *msgstream.TimeTickMsg)
|
||||
|
||||
segAssigner := NewSegmentAssigner(ctx, mt, globalTsoAllocator, syncProxyChan)
|
||||
mockScheduler := &MockFlushScheduler{}
|
||||
segManager, err := NewSegmentManager(ctx, mt, globalIDAllocator, globalTsoAllocator, syncWriteChan, mockScheduler, segAssigner)
|
||||
assert.Nil(t, err)
|
||||
|
||||
segManager.Start()
|
||||
defer segManager.Close()
|
||||
sizePerRecord, err := typeutil.EstimateSizePerRecord(schema)
|
||||
assert.Nil(t, err)
|
||||
maxCount := uint32(Params.SegmentSize * 1024 * 1024 / float64(sizePerRecord))
|
||||
|
||||
req := []*internalpb.SegIDRequest{
|
||||
{Count: maxCount, ChannelID: 1, CollName: collName, PartitionTag: partitionTag},
|
||||
{Count: maxCount, ChannelID: 2, CollName: collName, PartitionTag: partitionTag},
|
||||
{Count: maxCount, ChannelID: 3, CollName: collName, PartitionTag: partitionTag},
|
||||
}
|
||||
assignSegment, err := segManager.AssignSegment(req)
|
||||
assert.Nil(t, err)
|
||||
timestamp, err := globalTsoAllocator()
|
||||
assert.Nil(t, err)
|
||||
for i := 0; i < len(assignSegment); i++ {
|
||||
assert.EqualValues(t, maxCount, assignSegment[i].Count)
|
||||
assert.EqualValues(t, i+1, assignSegment[i].ChannelID)
|
||||
|
||||
err = mt.UpdateSegment(&pb.SegmentMeta{
|
||||
SegmentID: assignSegment[i].SegID,
|
||||
CollectionID: collID,
|
||||
PartitionTag: partitionTag,
|
||||
ChannelStart: 0,
|
||||
ChannelEnd: 1,
|
||||
CloseTime: timestamp,
|
||||
NumRows: int64(maxCount),
|
||||
MemSize: 500000,
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
time.Sleep(time.Duration(Params.SegIDAssignExpiration) * time.Millisecond)
|
||||
|
||||
timestamp, err = globalTsoAllocator()
|
||||
assert.Nil(t, err)
|
||||
tsMsg := &msgstream.TimeTickMsg{
|
||||
BaseMsg: msgstream.BaseMsg{
|
||||
BeginTimestamp: timestamp, EndTimestamp: timestamp, HashValues: []uint32{},
|
||||
},
|
||||
TimeTickMsg: internalpb.TimeTickMsg{
|
||||
MsgType: internalpb.MsgType_kTimeTick,
|
||||
PeerID: 1,
|
||||
Timestamp: timestamp,
|
||||
},
|
||||
}
|
||||
syncWriteChan <- tsMsg
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
|
||||
status := segManager.collStatus[collID]
|
||||
assert.Empty(t, status.segments)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue