Add context to query node

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/4973/head^2
bigsheeper 2020-10-14 11:07:25 +08:00 committed by yefu.chen
parent af3c14a8c4
commit 721998050b
13 changed files with 411 additions and 161 deletions

View File

@ -1,6 +1,7 @@
package main
import (
"context"
"flag"
"fmt"
"github.com/czs007/suvlim/conf"
@ -9,17 +10,19 @@ import (
)
func main() {
ctx, _ := context.WithCancel(context.Background())
var yamlFile string
flag.StringVar(&yamlFile, "yaml", "", "yaml file")
flag.Parse()
// flag.Usage()
fmt.Println("yaml file: ", yamlFile)
conf.LoadConfig(yamlFile)
pulsarAddr := "pulsar://"
pulsarAddr += conf.Config.Pulsar.Address
pulsarAddr += ":"
pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10)
reader.StartQueryNode(pulsarAddr)
reader.StartQueryNode(ctx, pulsarAddr)
}

View File

@ -14,6 +14,9 @@ import (
)
type MessageClient struct {
// context
ctx context.Context
// timesync
timeSyncCfg *timesync.ReaderTimeSyncCfg
@ -22,12 +25,12 @@ type MessageClient struct {
key2SegChan chan *msgpb.Key2SegMsg
// pulsar
client pulsar.Client
client pulsar.Client
//searchResultProducer pulsar.Producer
searchResultProducers map[int64]pulsar.Producer
segmentsStatisticProducer pulsar.Producer
searchConsumer pulsar.Consumer
key2segConsumer pulsar.Consumer
searchConsumer pulsar.Consumer
key2segConsumer pulsar.Consumer
// batch messages
InsertOrDeleteMsg []*msgpb.InsertOrDeleteMsg
@ -79,27 +82,45 @@ func (mc *MessageClient) GetSearchChan() <-chan *msgpb.SearchMsg {
func (mc *MessageClient) receiveSearchMsg() {
for {
searchMsg := msgpb.SearchMsg{}
msg, err := mc.searchConsumer.Receive(context.Background())
err = proto.Unmarshal(msg.Payload(), &searchMsg)
if err != nil {
log.Fatal(err)
select {
case <-mc.ctx.Done():
return
default:
searchMsg := msgpb.SearchMsg{}
msg, err := mc.searchConsumer.Receive(mc.ctx)
if err != nil {
log.Println(err)
continue
}
err = proto.Unmarshal(msg.Payload(), &searchMsg)
if err != nil {
log.Fatal(err)
}
mc.searchChan <- &searchMsg
mc.searchConsumer.Ack(msg)
}
mc.searchChan <- &searchMsg
mc.searchConsumer.Ack(msg)
}
}
func (mc *MessageClient) receiveKey2SegMsg() {
for {
key2SegMsg := msgpb.Key2SegMsg{}
msg, err := mc.key2segConsumer.Receive(context.Background())
err = proto.Unmarshal(msg.Payload(), &key2SegMsg)
if err != nil {
log.Fatal(err)
select {
case <-mc.ctx.Done():
return
default:
key2SegMsg := msgpb.Key2SegMsg{}
msg, err := mc.key2segConsumer.Receive(mc.ctx)
if err != nil {
log.Println(err)
continue
}
err = proto.Unmarshal(msg.Payload(), &key2SegMsg)
if err != nil {
log.Fatal(err)
}
mc.key2SegChan <- &key2SegMsg
mc.key2segConsumer.Ack(msg)
}
mc.key2SegChan <- &key2SegMsg
mc.key2segConsumer.Ack(msg)
}
}
@ -141,7 +162,7 @@ func (mc *MessageClient) createClient(url string) pulsar.Client {
if conf.Config.Pulsar.Authentication {
// create client with Authentication
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: url,
URL: url,
Authentication: pulsar.NewAuthenticationToken(conf.Config.Pulsar.Token),
})
@ -162,7 +183,10 @@ func (mc *MessageClient) createClient(url string) pulsar.Client {
return client
}
func (mc *MessageClient) InitClient(url string) {
func (mc *MessageClient) InitClient(ctx context.Context, url string) {
// init context
mc.ctx = ctx
//create client
mc.client = mc.createClient(url)
mc.MessageClientID = conf.Config.Reader.ClientId
@ -185,7 +209,7 @@ func (mc *MessageClient) InitClient(url string) {
insertOrDeleteTopicName = "InsertOrDelete-" + conf.Config.Pulsar.User + "-"
}
for _, key := range proxyIdList{
for _, key := range proxyIdList {
topic := searchResultTopicName
topic = topic + strconv.Itoa(int(key))
mc.searchResultProducers[key] = mc.creatProducer(topic)
@ -217,7 +241,8 @@ func (mc *MessageClient) InitClient(url string) {
readSubName := "reader" + strconv.Itoa(mc.MessageClientID)
readerQueueSize := timesync.WithReaderQueueSize(conf.Config.Reader.ReaderQueueSize)
timeSync, err := timesync.NewReaderTimeSync(timeSyncTopic,
timeSync, err := timesync.NewReaderTimeSync(ctx,
timeSyncTopic,
timeSyncSubName,
readTopics,
readSubName,
@ -236,14 +261,26 @@ func (mc *MessageClient) InitClient(url string) {
}
func (mc *MessageClient) Close() {
mc.client.Close()
for key, _ := range mc.searchResultProducers {
mc.searchResultProducers[key].Close()
if mc.client != nil {
mc.client.Close()
}
for key, _ := range mc.searchResultProducers {
if mc.searchResultProducers[key] != nil {
mc.searchResultProducers[key].Close()
}
}
if mc.segmentsStatisticProducer != nil {
mc.segmentsStatisticProducer.Close()
}
if mc.searchConsumer != nil {
mc.searchConsumer.Close()
}
if mc.key2segConsumer != nil {
mc.key2segConsumer.Close()
}
if mc.timeSyncCfg != nil {
mc.timeSyncCfg.Close()
}
mc.segmentsStatisticProducer.Close()
mc.searchConsumer.Close()
mc.key2segConsumer.Close()
mc.timeSyncCfg.Close()
}
type MessageType int

View File

@ -1,6 +1,7 @@
package reader
import (
"context"
"encoding/binary"
"fmt"
msgPb "github.com/czs007/suvlim/pkg/master/grpc/message"
@ -12,7 +13,8 @@ import (
func TestIndex_BuildIndex(t *testing.T) {
// 1. Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
ctx := context.Background()
node := NewQueryNode(ctx, 0, 0)
var collection = node.NewCollection(0, "collection0", "")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
@ -74,4 +76,5 @@ func TestIndex_BuildIndex(t *testing.T) {
partition.DeleteSegment(segment)
collection.DeletePartition(partition)
node.DeleteCollection(collection)
node.Close()
}

View File

@ -1,7 +1,6 @@
package reader
import (
"context"
"fmt"
"log"
"path"
@ -274,12 +273,12 @@ func (node *QueryNode) InitFromMeta() error {
return nil
}
func (node *QueryNode) RunMetaService(ctx context.Context, wg *sync.WaitGroup) {
func (node *QueryNode) RunMetaService(wg *sync.WaitGroup) {
//node.InitFromMeta()
metaChan := node.kvBase.WatchWithPrefix("")
for {
select {
case <-ctx.Done():
case <-node.ctx.Done():
wg.Done()
println("DONE!!!!!!")
return

View File

@ -14,6 +14,7 @@ package reader
import "C"
import (
"context"
"encoding/json"
"fmt"
"github.com/czs007/suvlim/conf"
@ -87,6 +88,9 @@ type InsertLog struct {
}
type QueryNode struct {
// context
ctx context.Context
QueryNodeId uint64
Collections []*Collection
SegmentsMap map[int64]*Segment
@ -102,7 +106,7 @@ type QueryNode struct {
InsertLogs []InsertLog
}
func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode {
func NewQueryNode(ctx context.Context, queryNodeId uint64, timeSync uint64) *QueryNode {
mc := message_client.MessageClient{}
queryNodeTimeSync := &QueryNodeTime{
@ -129,6 +133,7 @@ func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode {
}
return &QueryNode{
ctx: ctx,
QueryNodeId: queryNodeId,
Collections: nil,
SegmentsMap: segmentsMap,
@ -140,11 +145,15 @@ func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode {
}
func (node *QueryNode) Close() {
node.messageClient.Close()
node.kvBase.Close()
if node.messageClient != nil {
node.messageClient.Close()
}
if node.kvBase != nil {
node.kvBase.Close()
}
}
func CreateQueryNode(queryNodeId uint64, timeSync uint64, mc *message_client.MessageClient) *QueryNode {
func CreateQueryNode(ctx context.Context, queryNodeId uint64, timeSync uint64, mc *message_client.MessageClient) *QueryNode {
queryNodeTimeSync := &QueryNodeTime{
ReadTimeSyncMin: timeSync,
ReadTimeSyncMax: timeSync,
@ -172,6 +181,7 @@ func CreateQueryNode(queryNodeId uint64, timeSync uint64, mc *message_client.Mes
}
return &QueryNode{
ctx: ctx,
QueryNodeId: queryNodeId,
Collections: nil,
SegmentsMap: segmentsMap,
@ -269,52 +279,64 @@ func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) {
if Debug {
for {
var msgLen = node.PrepareBatchMsg()
var timeRange = TimeRange{node.messageClient.TimeSyncStart(), node.messageClient.TimeSyncEnd()}
assert.NotEqual(nil, 0, timeRange.timestampMin)
assert.NotEqual(nil, 0, timeRange.timestampMax)
select {
case <-node.ctx.Done():
wg.Done()
return
default:
var msgLen = node.PrepareBatchMsg()
var timeRange = TimeRange{node.messageClient.TimeSyncStart(), node.messageClient.TimeSyncEnd()}
assert.NotEqual(nil, 0, timeRange.timestampMin)
assert.NotEqual(nil, 0, timeRange.timestampMax)
if node.msgCounter.InsertCounter/CountInsertMsgBaseline != BaselineCounter {
node.WriteQueryLog()
BaselineCounter = node.msgCounter.InsertCounter / CountInsertMsgBaseline
}
if node.msgCounter.InsertCounter/CountInsertMsgBaseline != BaselineCounter {
node.WriteQueryLog()
BaselineCounter = node.msgCounter.InsertCounter / CountInsertMsgBaseline
}
if msgLen[0] == 0 && len(node.buffer.InsertDeleteBuffer) <= 0 {
if msgLen[0] == 0 && len(node.buffer.InsertDeleteBuffer) <= 0 {
node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange)
continue
}
node.QueryNodeDataInit()
node.MessagesPreprocess(node.messageClient.InsertOrDeleteMsg, timeRange)
//fmt.Println("MessagesPreprocess Done")
node.WriterDelete()
node.PreInsertAndDelete()
//fmt.Println("PreInsertAndDelete Done")
node.DoInsertAndDelete()
//fmt.Println("DoInsertAndDelete Done")
node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange)
continue
}
node.QueryNodeDataInit()
node.MessagesPreprocess(node.messageClient.InsertOrDeleteMsg, timeRange)
//fmt.Println("MessagesPreprocess Done")
node.WriterDelete()
node.PreInsertAndDelete()
//fmt.Println("PreInsertAndDelete Done")
node.DoInsertAndDelete()
//fmt.Println("DoInsertAndDelete Done")
node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange)
}
} else {
for {
var msgLen = node.PrepareBatchMsg()
var timeRange = TimeRange{node.messageClient.TimeSyncStart(), node.messageClient.TimeSyncEnd()}
assert.NotEqual(nil, 0, timeRange.timestampMin)
assert.NotEqual(nil, 0, timeRange.timestampMax)
select {
case <-node.ctx.Done():
wg.Done()
return
default:
var msgLen = node.PrepareBatchMsg()
var timeRange = TimeRange{node.messageClient.TimeSyncStart(), node.messageClient.TimeSyncEnd()}
assert.NotEqual(nil, 0, timeRange.timestampMin)
assert.NotEqual(nil, 0, timeRange.timestampMax)
if msgLen[0] == 0 && len(node.buffer.InsertDeleteBuffer) <= 0 {
if msgLen[0] == 0 && len(node.buffer.InsertDeleteBuffer) <= 0 {
node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange)
continue
}
node.QueryNodeDataInit()
node.MessagesPreprocess(node.messageClient.InsertOrDeleteMsg, timeRange)
//fmt.Println("MessagesPreprocess Done")
node.WriterDelete()
node.PreInsertAndDelete()
//fmt.Println("PreInsertAndDelete Done")
node.DoInsertAndDelete()
//fmt.Println("DoInsertAndDelete Done")
node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange)
continue
}
node.QueryNodeDataInit()
node.MessagesPreprocess(node.messageClient.InsertOrDeleteMsg, timeRange)
//fmt.Println("MessagesPreprocess Done")
node.WriterDelete()
node.PreInsertAndDelete()
//fmt.Println("PreInsertAndDelete Done")
node.DoInsertAndDelete()
//fmt.Println("DoInsertAndDelete Done")
node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange)
}
}
wg.Done()
@ -336,6 +358,9 @@ func (node *QueryNode) TestInsertDelete(timeRange TimeRange) {
func (node *QueryNode) RunSearch(wg *sync.WaitGroup) {
for {
select {
case <-node.ctx.Done():
wg.Done()
return
case msg := <-node.messageClient.GetSearchChan():
node.messageClient.SearchMsg = node.messageClient.SearchMsg[:0]
node.messageClient.SearchMsg = append(node.messageClient.SearchMsg, msg)
@ -651,7 +676,6 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
}
}
var entities = msgPb.Entities{
Ids: make([]int64, 0),
}

View File

@ -7,13 +7,12 @@ import (
"sync"
)
func StartQueryNode(pulsarURL string) {
func StartQueryNode(ctx context.Context, pulsarURL string) {
mc := message_client.MessageClient{}
mc.InitClient(pulsarURL)
mc.InitClient(ctx, pulsarURL)
mc.ReceiveMessage()
qn := CreateQueryNode(0, 0, &mc)
ctx := context.Background()
qn := CreateQueryNode(ctx, 0, 0, &mc)
// Segments Services
go qn.SegmentManagementService()
@ -28,7 +27,7 @@ func StartQueryNode(pulsarURL string) {
}
wg.Add(3)
go qn.RunMetaService(ctx, &wg)
go qn.RunMetaService(&wg)
go qn.RunInsertDelete(&wg)
go qn.RunSearch(&wg)
wg.Wait()

View File

@ -1,22 +1,25 @@
package reader
import (
"context"
"github.com/czs007/suvlim/conf"
"strconv"
"testing"
"time"
)
const ctxTimeInMillisecond = 200
// NOTE: start pulsar before test
func TestReader_startQueryNode(t *testing.T) {
//pulsarURL := "pulsar://localhost:6650"
conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d)
pulsarAddr := "pulsar://"
pulsarAddr += conf.Config.Pulsar.Address
pulsarAddr += ":"
pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10)
println(pulsarAddr)
StartQueryNode(pulsarAddr)
//go StartQueryNode(pulsarAddr, 0)
//StartQueryNode(pulsarAddr, 1)
StartQueryNode(ctx, pulsarAddr)
}

View File

@ -1,21 +1,39 @@
package reader
import (
"context"
"github.com/czs007/suvlim/conf"
"github.com/czs007/suvlim/reader/message_client"
"strconv"
"testing"
"time"
masterPb "github.com/czs007/suvlim/pkg/master/grpc/master"
msgPb "github.com/czs007/suvlim/pkg/master/grpc/message"
)
// NOTE: start pulsar before test
func TestResult_PublishSearchResult(t *testing.T) {
conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d)
mc := message_client.MessageClient{}
pulsarAddr := "pulsar://"
pulsarAddr += conf.Config.Pulsar.Address
pulsarAddr += ":"
pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10)
mc.InitClient(ctx, pulsarAddr)
node := CreateQueryNode(ctx, 0, 0, &mc)
// Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection(0, "collection0", "")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
node.SegmentsMap[0] = segment
// TODO: start pulsar server
const N = 10
var entityIDs = msgPb.Entities{
Ids: make([]int64, N),
@ -29,11 +47,26 @@ func TestResult_PublishSearchResult(t *testing.T) {
result.Distances = append(result.Distances, float32(i))
}
node.PublishSearchResult(&result)
node.Close()
}
// NOTE: start pulsar before test
func TestResult_PublishFailedSearchResult(t *testing.T) {
conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d)
mc := message_client.MessageClient{}
pulsarAddr := "pulsar://"
pulsarAddr += conf.Config.Pulsar.Address
pulsarAddr += ":"
pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10)
mc.InitClient(ctx, pulsarAddr)
node := CreateQueryNode(ctx, 0, 0, &mc)
// Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection(0, "collection0", "")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
@ -41,11 +74,27 @@ func TestResult_PublishFailedSearchResult(t *testing.T) {
// TODO: start pulsar server
node.PublishFailedSearchResult()
node.Close()
}
// NOTE: start pulsar before test
func TestResult_PublicStatistic(t *testing.T) {
conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d)
mc := message_client.MessageClient{}
pulsarAddr := "pulsar://"
pulsarAddr += conf.Config.Pulsar.Address
pulsarAddr += ":"
pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10)
mc.InitClient(ctx, pulsarAddr)
node := CreateQueryNode(ctx, 0, 0, &mc)
// Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection(0, "collection0", "")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
@ -66,4 +115,6 @@ func TestResult_PublicStatistic(t *testing.T) {
// TODO: start pulsar server
node.PublicStatistic(&statisticData)
node.Close()
}

View File

@ -35,8 +35,13 @@ func (node *QueryNode) SegmentManagementService() {
sleepMillisecondTime := 1000
fmt.Println("do segments management in ", strconv.Itoa(sleepMillisecondTime), "ms")
for {
time.Sleep(time.Duration(sleepMillisecondTime) * time.Millisecond)
node.SegmentsManagement()
select {
case <-node.ctx.Done():
return
default:
time.Sleep(time.Duration(sleepMillisecondTime) * time.Millisecond)
node.SegmentsManagement()
}
}
}
@ -91,7 +96,12 @@ func (node *QueryNode) SegmentStatisticService() {
sleepMillisecondTime := 1000
fmt.Println("do segments statistic in ", strconv.Itoa(sleepMillisecondTime), "ms")
for {
time.Sleep(time.Duration(sleepMillisecondTime) * time.Millisecond)
node.SegmentStatistic(sleepMillisecondTime)
select {
case <-node.ctx.Done():
return
default:
time.Sleep(time.Duration(sleepMillisecondTime) * time.Millisecond)
node.SegmentStatistic(sleepMillisecondTime)
}
}
}

View File

@ -1,53 +1,93 @@
package reader
import (
"context"
"github.com/czs007/suvlim/conf"
"github.com/czs007/suvlim/reader/message_client"
"strconv"
"testing"
"time"
)
func TestSegmentManagement_SegmentsManagement(t *testing.T) {
// Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
ctx := context.Background()
node := NewQueryNode(ctx, 0, 0)
var collection = node.NewCollection(0, "collection0", "")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
node.SegmentsMap[0] = segment
// TODO: fix segment management
node.SegmentsManagement()
node.Close()
}
func TestSegmentManagement_SegmentService(t *testing.T) {
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d)
// Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
node := NewQueryNode(ctx, 0, 0)
var collection = node.NewCollection(0, "collection0", "")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
node.SegmentsMap[0] = segment
// TODO: fix segment service
node.SegmentManagementService()
node.Close()
}
// NOTE: start pulsar before test
func TestSegmentManagement_SegmentStatistic(t *testing.T) {
conf.LoadConfig("config.yaml")
ctx, _ := context.WithCancel(context.Background())
mc := message_client.MessageClient{}
pulsarAddr := "pulsar://"
pulsarAddr += conf.Config.Pulsar.Address
pulsarAddr += ":"
pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10)
mc.InitClient(ctx, pulsarAddr)
mc.ReceiveMessage()
node := CreateQueryNode(ctx, 0, 0, &mc)
// Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection(0, "collection0", "")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
node.SegmentsMap[0] = segment
// TODO: start pulsar server
node.SegmentStatistic(1000)
node.Close()
}
// NOTE: start pulsar before test
func TestSegmentManagement_SegmentStatisticService(t *testing.T) {
conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d)
mc := message_client.MessageClient{}
pulsarAddr := "pulsar://"
pulsarAddr += conf.Config.Pulsar.Address
pulsarAddr += ":"
pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10)
mc.InitClient(ctx, pulsarAddr)
mc.ReceiveMessage()
node := CreateQueryNode(ctx, 0, 0, &mc)
// Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
var collection = node.NewCollection(0, "collection0", "")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
node.SegmentsMap[0] = segment
// TODO: start pulsar server
node.SegmentStatisticService()
node.Close()
}

View File

@ -1,6 +1,7 @@
package reader
import (
"context"
"encoding/binary"
"fmt"
"math"
@ -12,7 +13,8 @@ import (
func TestSegment_ConstructorAndDestructor(t *testing.T) {
// 1. Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
ctx := context.Background()
node := NewQueryNode(ctx, 0, 0)
var collection = node.NewCollection(0, "collection0", "")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
@ -21,11 +23,14 @@ func TestSegment_ConstructorAndDestructor(t *testing.T) {
partition.DeleteSegment(segment)
collection.DeletePartition(partition)
node.DeleteCollection(collection)
node.Close()
}
func TestSegment_SegmentInsert(t *testing.T) {
// 1. Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
ctx := context.Background()
node := NewQueryNode(ctx, 0, 0)
var collection = node.NewCollection(0, "collection0", "")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
@ -66,11 +71,14 @@ func TestSegment_SegmentInsert(t *testing.T) {
partition.DeleteSegment(segment)
collection.DeletePartition(partition)
node.DeleteCollection(collection)
node.Close()
}
func TestSegment_SegmentDelete(t *testing.T) {
ctx := context.Background()
// 1. Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
node := NewQueryNode(ctx, 0, 0)
var collection = node.NewCollection(0, "collection0", "")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
@ -91,11 +99,14 @@ func TestSegment_SegmentDelete(t *testing.T) {
partition.DeleteSegment(segment)
collection.DeletePartition(partition)
node.DeleteCollection(collection)
node.Close()
}
func TestSegment_SegmentSearch(t *testing.T) {
ctx := context.Background()
// 1. Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
node := NewQueryNode(ctx, 0, 0)
var collection = node.NewCollection(0, "collection0", "")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
@ -152,11 +163,14 @@ func TestSegment_SegmentSearch(t *testing.T) {
partition.DeleteSegment(segment)
collection.DeletePartition(partition)
node.DeleteCollection(collection)
node.Close()
}
func TestSegment_SegmentPreInsert(t *testing.T) {
ctx := context.Background()
// 1. Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
node := NewQueryNode(ctx, 0, 0)
var collection = node.NewCollection(0, "collection0", "")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
@ -169,11 +183,14 @@ func TestSegment_SegmentPreInsert(t *testing.T) {
partition.DeleteSegment(segment)
collection.DeletePartition(partition)
node.DeleteCollection(collection)
node.Close()
}
func TestSegment_SegmentPreDelete(t *testing.T) {
ctx := context.Background()
// 1. Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
node := NewQueryNode(ctx, 0, 0)
var collection = node.NewCollection(0, "collection0", "")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
@ -186,13 +203,16 @@ func TestSegment_SegmentPreDelete(t *testing.T) {
partition.DeleteSegment(segment)
collection.DeletePartition(partition)
node.DeleteCollection(collection)
node.Close()
}
// Segment util functions test
////////////////////////////////////////////////////////////////////////////
func TestSegment_GetStatus(t *testing.T) {
ctx := context.Background()
// 1. Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
node := NewQueryNode(ctx, 0, 0)
var collection = node.NewCollection(0, "collection0", "")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
@ -205,11 +225,14 @@ func TestSegment_GetStatus(t *testing.T) {
partition.DeleteSegment(segment)
collection.DeletePartition(partition)
node.DeleteCollection(collection)
node.Close()
}
func TestSegment_Close(t *testing.T) {
ctx := context.Background()
// 1. Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
node := NewQueryNode(ctx, 0, 0)
var collection = node.NewCollection(0, "collection0", "")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
@ -222,11 +245,14 @@ func TestSegment_Close(t *testing.T) {
partition.DeleteSegment(segment)
collection.DeletePartition(partition)
node.DeleteCollection(collection)
node.Close()
}
func TestSegment_GetRowCount(t *testing.T) {
ctx := context.Background()
// 1. Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
node := NewQueryNode(ctx, 0, 0)
var collection = node.NewCollection(0, "collection0", "")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
@ -271,11 +297,14 @@ func TestSegment_GetRowCount(t *testing.T) {
partition.DeleteSegment(segment)
collection.DeletePartition(partition)
node.DeleteCollection(collection)
node.Close()
}
func TestSegment_GetDeletedCount(t *testing.T) {
ctx := context.Background()
// 1. Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
node := NewQueryNode(ctx, 0, 0)
var collection = node.NewCollection(0, "collection0", "")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
@ -301,11 +330,14 @@ func TestSegment_GetDeletedCount(t *testing.T) {
partition.DeleteSegment(segment)
collection.DeletePartition(partition)
node.DeleteCollection(collection)
node.Close()
}
func TestSegment_GetMemSize(t *testing.T) {
ctx := context.Background()
// 1. Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
node := NewQueryNode(ctx, 0, 0)
var collection = node.NewCollection(0, "collection0", "")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
@ -344,22 +376,28 @@ func TestSegment_GetMemSize(t *testing.T) {
// 6. Get memory usage in bytes
var memSize = segment.GetMemSize()
assert.Equal(t, memSize, uint64(1048714))
assert.Equal(t, memSize, uint64(2785280))
// 7. Destruct collection, partition and segment
partition.DeleteSegment(segment)
collection.DeletePartition(partition)
node.DeleteCollection(collection)
node.Close()
}
func TestSegment_RealSchemaTest(t *testing.T) {
ctx := context.Background()
// 1. Construct node, collection, partition and segment
//var schemaString = "id: 6873737669791618215\nname: \"collection0\"\nschema: \u003c\n " +
// "field_metas: \u003c\n field_name: \"age\"\n type: INT32\n dim: 1\n \u003e\n " +
// "field_metas: \u003c\n field_name: \"field_1\"\n type: VECTOR_FLOAT\n dim: 16\n \u003e\n" +
// "\u003e\ncreate_time: 1600416765\nsegment_ids: 6873737669791618215\npartition_tags: \"default\"\n"
var schemaString = "id: 6875229265736357360\nname: \"collection0\"\nschema: \u003c\n field_metas: \u003c\n field_name: \"field_3\"\n type: INT32\n \u003e\n field_metas: \u003c\n field_name: \"field_vec\"\n type: VECTOR_FLOAT\n \u003e\n\u003e\ncreate_time: 1600764055\nsegment_ids: 6875229265736357360\npartition_tags: \"default\"\n"
node := NewQueryNode(0, 0)
var schemaString = "id: 6875229265736357360\nname: \"collection0\"\nschema: \u003c\n " +
"field_metas: \u003c\n field_name: \"field_3\"\n type: INT32\n dim: 1\n \u003e\n " +
"field_metas: \u003c\n field_name: \"field_vec\"\n type: VECTOR_FLOAT\n dim: 16\n " +
"\u003e\n\u003e\ncreate_time: 1600764055\nsegment_ids: 6875229265736357360\npartition_tags: \"default\"\n"
node := NewQueryNode(ctx, 0, 0)
var collection = node.NewCollection(0, "collection0", schemaString)
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
@ -400,4 +438,6 @@ func TestSegment_RealSchemaTest(t *testing.T) {
partition.DeleteSegment(segment)
collection.DeletePartition(partition)
node.DeleteCollection(collection)
node.Close()
}

View File

@ -1,18 +1,42 @@
package reader
import (
"context"
"github.com/czs007/suvlim/conf"
"github.com/czs007/suvlim/reader/message_client"
"strconv"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
// NOTE: start pulsar before test
func TestUtilFunctions_GetKey2Segments(t *testing.T) {
// TODO: Add GetKey2Segments test
conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d)
mc := message_client.MessageClient{}
pulsarAddr := "pulsar://"
pulsarAddr += conf.Config.Pulsar.Address
pulsarAddr += ":"
pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10)
mc.InitClient(ctx, pulsarAddr)
mc.ReceiveMessage()
node := CreateQueryNode(ctx, 0, 0, &mc)
node.messageClient.PrepareKey2SegmentMsg()
var _, _, _ = node.GetKey2Segments()
node.Close()
}
func TestUtilFunctions_GetCollectionByCollectionName(t *testing.T) {
ctx := context.Background()
// 1. Construct node, and collections
node := NewQueryNode(0, 0)
node := NewQueryNode(ctx, 0, 0)
var _ = node.NewCollection(0, "collection0", "")
// 2. Get collection by collectionName
@ -21,12 +45,15 @@ func TestUtilFunctions_GetCollectionByCollectionName(t *testing.T) {
assert.Equal(t, c0.CollectionName, "collection0")
c0 = node.GetCollectionByID(0)
assert.NotNil(t, c0)
assert.Equal(t, c0.CollectionID, 0)
assert.Equal(t, c0.CollectionID, uint64(0))
node.Close()
}
func TestUtilFunctions_GetSegmentBySegmentID(t *testing.T) {
ctx := context.Background()
// 1. Construct node, collection, partition and segment
node := NewQueryNode(0, 0)
node := NewQueryNode(ctx, 0, 0)
var collection = node.NewCollection(0, "collection0", "")
var partition = collection.NewPartition("partition0")
var segment = partition.NewSegment(0)
@ -36,4 +63,6 @@ func TestUtilFunctions_GetSegmentBySegmentID(t *testing.T) {
var s0, err = node.GetSegmentBySegmentID(0)
assert.NoError(t, err)
assert.Equal(t, s0.SegmentId, int64(0))
node.Close()
}

View File

@ -66,7 +66,6 @@ type ReaderTimeSyncCfg struct {
revTimesyncFromReader map[uint64]int
ctx context.Context
cancel context.CancelFunc
InsertLogs []InsertLog
RoleType TimeSyncRole
}
@ -83,6 +82,7 @@ func toMillisecond(ts *pb.TimeSyncMsg) int {
}
func NewReaderTimeSync(
ctx context.Context,
timeSyncTopic string,
timeSyncSubName string,
readTopics []string,
@ -133,7 +133,7 @@ func NewReaderTimeSync(
r.timesyncMsgChan = make(chan TimeSyncMsg, len(readTopics)*r.readerQueueSize)
r.insertOrDeleteChan = make(chan *pb.InsertOrDeleteMsg, len(readTopics)*r.readerQueueSize)
r.revTimesyncFromReader = make(map[uint64]int)
r.ctx, r.cancel = context.WithCancel(context.Background())
r.ctx = ctx
var client pulsar.Client
var err error
@ -186,13 +186,20 @@ func NewReaderTimeSync(
}
func (r *ReaderTimeSyncCfg) Close() {
r.cancel()
r.timeSyncConsumer.Close()
r.readerConsumer.Close()
for i := 0; i < len(r.readerProducer); i++ {
r.readerProducer[i].Close()
if r.timeSyncConsumer != nil {
r.timeSyncConsumer.Close()
}
if r.readerConsumer != nil {
r.readerConsumer.Close()
}
for i := 0; i < len(r.readerProducer); i++ {
if r.readerProducer[i] != nil {
r.readerProducer[i].Close()
}
}
if r.pulsarClient != nil {
r.pulsarClient.Close()
}
r.pulsarClient.Close()
}
func (r *ReaderTimeSyncCfg) Start() error {
@ -278,43 +285,48 @@ func (r *ReaderTimeSyncCfg) sendEOFMsg(ctx context.Context, msg *pulsar.Producer
}
func (r *ReaderTimeSyncCfg) startTimeSync() {
ctx := r.ctx
tsm := make([]*pb.TimeSyncMsg, 0, len(r.proxyIdList)*2)
ctx, _ := context.WithCancel(r.ctx)
var err error
for {
//var start time.Time
for len(tsm) != len(r.proxyIdList) {
tsm = r.alignTimeSync(tsm)
tsm, err = r.readTimeSync(ctx, tsm, len(r.proxyIdList)-len(tsm))
if err != nil {
if ctx.Err() != nil {
return
} else {
//TODO, log error msg
log.Printf("read time sync error %v", err)
select {
case <-ctx.Done():
return
default:
//var start time.Time
for len(tsm) != len(r.proxyIdList) {
tsm = r.alignTimeSync(tsm)
tsm, err = r.readTimeSync(ctx, tsm, len(r.proxyIdList)-len(tsm))
if err != nil {
if ctx.Err() != nil {
return
} else {
//TODO, log error msg
log.Printf("read time sync error %v", err)
}
}
}
}
ts := tsm[0].Timestamp
for i := 1; i < len(tsm); i++ {
if tsm[i].Timestamp < ts {
ts = tsm[i].Timestamp
ts := tsm[0].Timestamp
for i := 1; i < len(tsm); i++ {
if tsm[i].Timestamp < ts {
ts = tsm[i].Timestamp
}
}
}
tsm = tsm[:0]
//send timestamp flag to reader channel
msg := pb.InsertOrDeleteMsg{Timestamp: ts, ClientId: r.readStopFlagClientId}
payload, err := proto.Marshal(&msg)
if err != nil {
//TODO log error
log.Printf("Marshal timesync flag error %v", err)
} else {
wg := sync.WaitGroup{}
wg.Add(len(r.readerProducer))
for index := range r.readerProducer {
go r.sendEOFMsg(ctx, &pulsar.ProducerMessage{Payload: payload}, index, &wg)
tsm = tsm[:0]
//send timestamp flag to reader channel
msg := pb.InsertOrDeleteMsg{Timestamp: ts, ClientId: r.readStopFlagClientId}
payload, err := proto.Marshal(&msg)
if err != nil {
//TODO log error
log.Printf("Marshal timesync flag error %v", err)
} else {
wg := sync.WaitGroup{}
wg.Add(len(r.readerProducer))
for index := range r.readerProducer {
go r.sendEOFMsg(ctx, &pulsar.ProducerMessage{Payload: payload}, index, &wg)
}
wg.Wait()
}
wg.Wait()
}
}
}
@ -362,7 +374,7 @@ func (r *ReaderTimeSyncCfg) WriteInsertLog() {
}
func (r *ReaderTimeSyncCfg) startReadTopics() {
ctx, _ := context.WithCancel(r.ctx)
ctx := r.ctx
tsm := TimeSyncMsg{Timestamp: 0, NumRecorders: 0}
const Debug = true
const WriterBaseline = 1000 * 1000