milvus/internal/master/time_snyc_producer_test.go

124 lines
3.8 KiB
Go

package master
import (
"context"
"log"
"testing"
"time"
"github.com/stretchr/testify/assert"
ms "github.com/zilliztech/milvus-distributed/internal/msgstream"
)
type (
TestTickBarrier struct {
value int64
ctx context.Context
}
)
func (ttBarrier *TestTickBarrier) GetTimeTick() (Timestamp, error) {
time.Sleep(1 * time.Second)
ttBarrier.value++
return Timestamp(ttBarrier.value), nil
}
func (ttBarrier *TestTickBarrier) Start() error {
go func(ctx context.Context) {
<-ctx.Done()
log.Printf("barrier context done, exit")
}(ttBarrier.ctx)
return nil
}
func (ttBarrier *TestTickBarrier) Close() {
_, cancel := context.WithCancel(context.Background())
cancel()
}
func initTestPulsarStream(ctx context.Context, pulsarAddress string,
producerChannels []string,
consumerChannels []string,
consumerSubName string, opts ...ms.RepackFunc) (*ms.MsgStream, *ms.MsgStream) {
// set input stream
inputStream := ms.NewPulsarMsgStream(ctx, 100)
inputStream.SetPulsarClient(pulsarAddress)
inputStream.CreatePulsarProducers(producerChannels)
for _, opt := range opts {
inputStream.SetRepackFunc(opt)
}
var input ms.MsgStream = inputStream
// set output stream
outputStream := ms.NewPulsarMsgStream(ctx, 100)
outputStream.SetPulsarClient(pulsarAddress)
unmarshalDispatcher := ms.NewUnmarshalDispatcher()
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, 100)
var output ms.MsgStream = outputStream
return &input, &output
}
func receiveMsg(stream *ms.MsgStream) []uint64 {
receiveCount := 0
var results []uint64
for {
result := (*stream).Consume()
if len(result.Msgs) > 0 {
msgs := result.Msgs
for _, v := range msgs {
timetickmsg := v.(*ms.TimeTickMsg)
results = append(results, timetickmsg.TimeTickMsg.Timestamp)
receiveCount++
if receiveCount == 10 {
return results
}
}
}
}
}
func TestStream_PulsarMsgStream_TimeTick(t *testing.T) {
Init()
pulsarAddress := Params.PulsarAddress
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
producerChannels := []string{"proxyDMTtBarrier"}
consumerChannels := []string{"proxyDMTtBarrier"}
consumerSubName := "proxyDMTtBarrier"
proxyDMTtInputStream, proxyDMTtOutputStream := initTestPulsarStream(ctx, pulsarAddress, producerChannels, consumerChannels, consumerSubName)
producerChannels = []string{"proxyDDTtBarrier"}
consumerChannels = []string{"proxyDDTtBarrier"}
consumerSubName = "proxyDDTtBarrier"
proxyDDTtInputStream, proxyDDTtOutputStream := initTestPulsarStream(ctx, pulsarAddress, producerChannels, consumerChannels, consumerSubName)
producerChannels = []string{"writeNodeBarrier"}
consumerChannels = []string{"writeNodeBarrier"}
consumerSubName = "writeNodeBarrier"
writeNodeInputStream, writeNodeOutputStream := initTestPulsarStream(ctx, pulsarAddress, producerChannels, consumerChannels, consumerSubName)
timeSyncProducer, _ := NewTimeSyncMsgProducer(ctx)
timeSyncProducer.SetProxyTtBarrier(&TestTickBarrier{ctx: ctx})
timeSyncProducer.SetWriteNodeTtBarrier(&TestTickBarrier{ctx: ctx})
timeSyncProducer.SetDMSyncStream(*proxyDMTtInputStream)
timeSyncProducer.SetDDSyncStream(*proxyDDTtInputStream)
timeSyncProducer.SetK2sSyncStream(*writeNodeInputStream)
(*proxyDMTtOutputStream).Start()
(*proxyDDTtOutputStream).Start()
(*writeNodeOutputStream).Start()
timeSyncProducer.Start()
expected := []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
result1 := receiveMsg(proxyDMTtOutputStream)
assert.Equal(t, expected, result1)
expected = []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
result1 = receiveMsg(proxyDDTtOutputStream)
assert.Equal(t, expected, result1)
expected = []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
result2 := receiveMsg(writeNodeOutputStream)
assert.Equal(t, expected, result2)
timeSyncProducer.Close()
}