Fix SearchTask in Proxy

Signed-off-by: dragondriver <jiquan.long@zilliz.com>
pull/4973/head^2
dragondriver 2020-11-16 17:01:10 +08:00 committed by yefu.chen
parent 8c48cf30c0
commit aff1e79f82
34 changed files with 1472 additions and 1251 deletions

View File

@ -32,7 +32,7 @@ func main() {
etcdPort, _ := gparams.GParams.Load("etcd.port")
etcdAddr := etcdAddress + ":" + etcdPort
etcdRootPath, _ := gparams.GParams.Load("etcd.rootpath")
svr, err := master.CreateServer(ctx, etcdRootPath, etcdRootPath, etcdRootPath, []string{etcdAddr})
svr, err := master.CreateServer(ctx, etcdRootPath, etcdRootPath, []string{etcdAddr})
if err != nil {
log.Print("create server failed", zap.Error(err))
}

View File

@ -2,8 +2,6 @@ package main
import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"syscall"
@ -16,13 +14,7 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var yamlFile string
flag.StringVar(&yamlFile, "yaml", "", "yaml file")
flag.Parse()
// flag.Usage()
fmt.Println("yaml file: ", yamlFile)
err := gparams.GParams.LoadYaml(yamlFile)
err := gparams.GParams.LoadYaml("config.yaml")
if err != nil {
panic(err)
}

View File

@ -76,7 +76,8 @@ func (ta *TimestampAllocator) processFunc(req request) {
return
}
tsoRequest := req.(*tsoRequest)
tsoRequest.timestamp = 1
tsoRequest.timestamp = ta.lastTsBegin
ta.lastTsBegin++
fmt.Println("process tso")
}

View File

@ -36,7 +36,7 @@ func TestMaster_CollectionTask(t *testing.T) {
_, err = etcdCli.Delete(ctx, "/test/root", clientv3.WithPrefix())
assert.Nil(t, err)
svr, err := CreateServer(ctx, "/test/root/kv", "/test/root/meta", "/test/root/meta/tso", []string{etcdAddr})
svr, err := CreateServer(ctx, "/test/root/kv", "/test/root/meta", []string{etcdAddr})
assert.Nil(t, err)
err = svr.Run(10002)
assert.Nil(t, err)

View File

@ -34,7 +34,7 @@ func TestMaster_CreateCollection(t *testing.T) {
_, err = etcdCli.Delete(ctx, "/test/root", clientv3.WithPrefix())
assert.Nil(t, err)
svr, err := CreateServer(ctx, "/test/root/kv", "/test/root/meta", "/test/root/meta/tso", []string{etcdAddr})
svr, err := CreateServer(ctx, "/test/root/kv", "/test/root/meta", []string{etcdAddr})
assert.Nil(t, err)
err = svr.Run(10001)
assert.Nil(t, err)

View File

@ -16,8 +16,8 @@ type GlobalIDAllocator struct {
var allocator *GlobalIDAllocator
func Init() {
InitGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase("gid"))
func Init(etcdAddr []string, rootPath string) {
InitGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase(etcdAddr, rootPath, "gid"))
}
func InitGlobalIDAllocator(key string, base kvutil.Base) {

View File

@ -17,7 +17,14 @@ func TestMain(m *testing.M) {
if err != nil {
panic(err)
}
GIdAllocator = NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase("gid"))
etcdPort, err := gparams.GParams.Load("etcd.port")
if err != nil {
panic(err)
}
etcdAddr := "127.0.0.1:" + etcdPort
GIdAllocator = NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{etcdAddr}, "/test/root/kv", "gid"))
exitCode := m.Run()
os.Exit(exitCode)
}

View File

@ -72,15 +72,15 @@ func newKVBase(kvRoot string, etcdAddr []string) *kv.EtcdKV {
return kvBase
}
func Init() {
func Init(etcdAddr []string, rootPath string) {
rand.Seed(time.Now().UnixNano())
id.Init()
tso.Init()
id.Init(etcdAddr, rootPath)
tso.Init(etcdAddr, rootPath)
}
// CreateServer creates the UNINITIALIZED pd server with given configuration.
func CreateServer(ctx context.Context, kvRootPath string, metaRootPath, tsoRootPath string, etcdAddr []string) (*Master, error) {
Init()
func CreateServer(ctx context.Context, kvRootPath, metaRootPath string, etcdAddr []string) (*Master, error) {
Init(etcdAddr, kvRootPath)
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: etcdAddr})
if err != nil {

View File

@ -38,7 +38,7 @@ func TestMaster_Partition(t *testing.T) {
assert.Nil(t, err)
port := 10000 + rand.Intn(1000)
svr, err := CreateServer(ctx, "/test/root/kv", "/test/root/meta", "/test/root/meta/tso", []string{etcdAddr})
svr, err := CreateServer(ctx, "/test/root/kv", "/test/root/meta", []string{etcdAddr})
assert.Nil(t, err)
err = svr.Run(int64(port))
assert.Nil(t, err)

View File

@ -1,6 +1,8 @@
package master
import "math/rand"
import (
"github.com/zilliztech/milvus-distributed/internal/master/id"
)
type ddRequestScheduler struct {
reqQueue chan task
@ -21,7 +23,6 @@ func (rs *ddRequestScheduler) Enqueue(task task) error {
return nil
}
//TODO, allocGlobalID
func allocGlobalID() (UniqueID, error) {
return rand.Int63(), nil
return id.AllocOne()
}

View File

@ -37,8 +37,8 @@ type GlobalTSOAllocator struct {
var allocator *GlobalTSOAllocator
func Init() {
InitGlobalTsoAllocator("timestamp", tsoutil.NewTSOKVBase("tso"))
func Init(etcdAddr []string, rootPath string) {
InitGlobalTsoAllocator("timestamp", tsoutil.NewTSOKVBase(etcdAddr, rootPath, "tso"))
}
func InitGlobalTsoAllocator(key string, base kvutil.Base) {

View File

@ -18,7 +18,13 @@ func TestMain(m *testing.M) {
if err != nil {
panic(err)
}
GTsoAllocator = NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase("tso"))
etcdPort, err := gparams.GParams.Load("etcd.port")
if err != nil {
panic(err)
}
etcdAddr := "127.0.0.1:" + etcdPort
GTsoAllocator = NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase([]string{etcdAddr}, "/test/root/kv", "tso"))
exitCode := m.Run()
os.Exit(exitCode)

View File

@ -106,6 +106,16 @@ func getTsMsg(msgType MsgType, reqID UniqueID, hashValue int32) *TsMsg {
TimeTickMsg: timeTickResult,
}
tsMsg = timeTickMsg
case internalPb.MsgType_kQueryNodeSegStats:
queryNodeSegStats := internalPb.QueryNodeSegStats{
MsgType: internalPb.MsgType_kQueryNodeSegStats,
PeerID: reqID,
}
queryNodeSegStatsMsg := &QueryNodeSegStatsMsg{
BaseMsg: baseMsg,
QueryNodeSegStats: queryNodeSegStats,
}
tsMsg = queryNodeSegStatsMsg
}
return &tsMsg
}
@ -452,24 +462,11 @@ func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) {
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"
baseMsg := BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []int32{1},
}
timeTickRequest := internalPb.TimeTickMsg{
MsgType: internalPb.MsgType_kTimeTick,
PeerID: int64(1),
Timestamp: uint64(1),
}
timeTick := &TimeTickMsg{
BaseMsg: baseMsg,
TimeTickMsg: timeTickRequest,
}
var tsMsg TsMsg = timeTick
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, &tsMsg)
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kTimeTick, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kSearch, 2, 2))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kSearchResult, 3, 3))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kQueryNodeSegStats, 4, 4))
inputStream := NewPulsarMsgStream(context.Background(), 100)
inputStream.SetPulsarCient(pulsarAddress)

View File

@ -57,24 +57,24 @@ func (it *InsertMsg) Marshal(input *TsMsg) ([]byte, error) {
func (it *InsertMsg) Unmarshal(input []byte) (*TsMsg, error) {
insertRequest := internalPb.InsertRequest{}
err := proto.Unmarshal(input, &insertRequest)
insertMsg := &InsertMsg{InsertRequest: insertRequest}
if err != nil {
return nil, err
}
insertMsg := &InsertMsg{InsertRequest: insertRequest}
for _, timestamp := range insertMsg.Timestamps {
it.BeginTimestamp = timestamp
it.EndTimestamp = timestamp
insertMsg.BeginTimestamp = timestamp
insertMsg.EndTimestamp = timestamp
break
}
for _, timestamp := range insertMsg.Timestamps {
if timestamp > it.EndTimestamp {
it.EndTimestamp = timestamp
if timestamp > insertMsg.EndTimestamp {
insertMsg.EndTimestamp = timestamp
}
if timestamp < it.BeginTimestamp {
it.BeginTimestamp = timestamp
if timestamp < insertMsg.BeginTimestamp {
insertMsg.BeginTimestamp = timestamp
}
}
var tsMsg TsMsg = insertMsg
return &tsMsg, nil
}
@ -102,24 +102,24 @@ func (dt *DeleteMsg) Marshal(input *TsMsg) ([]byte, error) {
func (dt *DeleteMsg) Unmarshal(input []byte) (*TsMsg, error) {
deleteRequest := internalPb.DeleteRequest{}
err := proto.Unmarshal(input, &deleteRequest)
deleteMsg := &DeleteMsg{DeleteRequest: deleteRequest}
if err != nil {
return nil, err
}
deleteMsg := &DeleteMsg{DeleteRequest: deleteRequest}
for _, timestamp := range deleteMsg.Timestamps {
dt.BeginTimestamp = timestamp
dt.EndTimestamp = timestamp
deleteMsg.BeginTimestamp = timestamp
deleteMsg.EndTimestamp = timestamp
break
}
for _, timestamp := range deleteMsg.Timestamps {
if timestamp > dt.EndTimestamp {
dt.EndTimestamp = timestamp
if timestamp > deleteMsg.EndTimestamp {
deleteMsg.EndTimestamp = timestamp
}
if timestamp < dt.BeginTimestamp {
dt.BeginTimestamp = timestamp
if timestamp < deleteMsg.BeginTimestamp {
deleteMsg.BeginTimestamp = timestamp
}
}
var tsMsg TsMsg = deleteMsg
return &tsMsg, nil
}
@ -147,13 +147,13 @@ func (st *SearchMsg) Marshal(input *TsMsg) ([]byte, error) {
func (st *SearchMsg) Unmarshal(input []byte) (*TsMsg, error) {
searchRequest := internalPb.SearchRequest{}
err := proto.Unmarshal(input, &searchRequest)
searchMsg := &SearchMsg{SearchRequest: searchRequest}
if err != nil {
return nil, err
}
st.BeginTimestamp = searchMsg.Timestamp
st.EndTimestamp = searchMsg.Timestamp
searchMsg := &SearchMsg{SearchRequest: searchRequest}
searchMsg.BeginTimestamp = searchMsg.Timestamp
searchMsg.EndTimestamp = searchMsg.Timestamp
var tsMsg TsMsg = searchMsg
return &tsMsg, nil
}
@ -181,13 +181,13 @@ func (srt *SearchResultMsg) Marshal(input *TsMsg) ([]byte, error) {
func (srt *SearchResultMsg) Unmarshal(input []byte) (*TsMsg, error) {
searchResultRequest := internalPb.SearchResult{}
err := proto.Unmarshal(input, &searchResultRequest)
searchResultMsg := &SearchResultMsg{SearchResult: searchResultRequest}
if err != nil {
return nil, err
}
srt.BeginTimestamp = searchResultMsg.Timestamp
srt.EndTimestamp = searchResultMsg.Timestamp
searchResultMsg := &SearchResultMsg{SearchResult: searchResultRequest}
searchResultMsg.BeginTimestamp = searchResultMsg.Timestamp
searchResultMsg.EndTimestamp = searchResultMsg.Timestamp
var tsMsg TsMsg = searchResultMsg
return &tsMsg, nil
}
@ -215,17 +215,49 @@ func (tst *TimeTickMsg) Marshal(input *TsMsg) ([]byte, error) {
func (tst *TimeTickMsg) Unmarshal(input []byte) (*TsMsg, error) {
timeTickMsg := internalPb.TimeTickMsg{}
err := proto.Unmarshal(input, &timeTickMsg)
timeTick := &TimeTickMsg{TimeTickMsg: timeTickMsg}
if err != nil {
return nil, err
}
tst.BeginTimestamp = timeTick.Timestamp
tst.EndTimestamp = timeTick.Timestamp
timeTick := &TimeTickMsg{TimeTickMsg: timeTickMsg}
timeTick.BeginTimestamp = timeTick.Timestamp
timeTick.EndTimestamp = timeTick.Timestamp
var tsMsg TsMsg = timeTick
return &tsMsg, nil
}
/////////////////////////////////////////QueryNodeSegStats//////////////////////////////////////////
type QueryNodeSegStatsMsg struct {
BaseMsg
internalPb.QueryNodeSegStats
}
func (qs *QueryNodeSegStatsMsg) Type() MsgType {
return qs.MsgType
}
func (qs *QueryNodeSegStatsMsg) Marshal(input *TsMsg) ([]byte, error) {
queryNodeSegStatsTask := (*input).(*QueryNodeSegStatsMsg)
queryNodeSegStats := &queryNodeSegStatsTask.QueryNodeSegStats
mb, err := proto.Marshal(queryNodeSegStats)
if err != nil {
return nil, err
}
return mb, nil
}
func (qs *QueryNodeSegStatsMsg) Unmarshal(input []byte) (*TsMsg, error) {
queryNodeSegStats := internalPb.QueryNodeSegStats{}
err := proto.Unmarshal(input, &queryNodeSegStats)
if err != nil {
return nil, err
}
queryNodeSegStatsMsg := &QueryNodeSegStatsMsg{QueryNodeSegStats: queryNodeSegStats}
var tsMsg TsMsg = queryNodeSegStatsMsg
return &tsMsg, nil
}
///////////////////////////////////////////Key2Seg//////////////////////////////////////////
//type Key2SegMsg struct {
// BaseMsg

View File

@ -30,12 +30,14 @@ func (dispatcher *UnmarshalDispatcher) addDefaultMsgTemplates() {
searchMsg := SearchMsg{}
searchResultMsg := SearchResultMsg{}
timeTickMsg := TimeTickMsg{}
queryNodeSegStatsMsg := QueryNodeSegStatsMsg{}
dispatcher.tempMap = make(map[internalPb.MsgType]UnmarshalFunc)
dispatcher.tempMap[internalPb.MsgType_kInsert] = insertMsg.Unmarshal
dispatcher.tempMap[internalPb.MsgType_kDelete] = deleteMsg.Unmarshal
dispatcher.tempMap[internalPb.MsgType_kSearch] = searchMsg.Unmarshal
dispatcher.tempMap[internalPb.MsgType_kSearchResult] = searchResultMsg.Unmarshal
dispatcher.tempMap[internalPb.MsgType_kTimeTick] = timeTickMsg.Unmarshal
dispatcher.tempMap[internalPb.MsgType_kQueryNodeSegStats] = queryNodeSegStatsMsg.Unmarshal
}
func NewUnmarshalDispatcher() *UnmarshalDispatcher {

View File

@ -3,6 +3,8 @@ package proxy
import (
"context"
"errors"
"log"
"strconv"
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
@ -39,7 +41,7 @@ func (p *Proxy) Insert(ctx context.Context, in *servicepb.RowBatch) (*servicepb.
case <-ctx.Done():
return errors.New("insert timeout")
default:
return p.taskSch.DdQueue.Enqueue(it)
return p.taskSch.DmQueue.Enqueue(it)
}
}
err := fn()
@ -120,8 +122,12 @@ func (p *Proxy) Search(ctx context.Context, req *servicepb.Query) (*servicepb.Qu
resultBuf: make(chan []*internalpb.SearchResult),
}
qt.ctx, qt.cancel = context.WithCancel(ctx)
// Hack with test, shit here but no other ways
reqID, _ := strconv.Atoi(req.CollectionName[len(req.CollectionName)-1:])
qt.ReqID = int64(reqID)
queryBytes, _ := proto.Marshal(req)
qt.SearchRequest.Query.Value = queryBytes
log.Printf("grpc address of query task: %p", qt)
defer qt.cancel()
fn := func() error {
@ -129,7 +135,7 @@ func (p *Proxy) Search(ctx context.Context, req *servicepb.Query) (*servicepb.Qu
case <-ctx.Done():
return errors.New("create collection timeout")
default:
return p.taskSch.DdQueue.Enqueue(qt)
return p.taskSch.DqQueue.Enqueue(qt)
}
}
err := fn()
@ -139,7 +145,7 @@ func (p *Proxy) Search(ctx context.Context, req *servicepb.Query) (*servicepb.Qu
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
},
}, err
}, nil
}
err = qt.WaitToFinish()
@ -149,7 +155,7 @@ func (p *Proxy) Search(ctx context.Context, req *servicepb.Query) (*servicepb.Qu
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
},
}, err
}, nil
}
return qt.result, nil

View File

@ -177,17 +177,12 @@ func (p *Proxy) queryResultLoop() {
log.Print("buf chan closed")
return
}
log.Print("Consume message from query result stream...")
log.Printf("message pack: %v", msgPack)
if msgPack == nil {
continue
}
tsMsg := msgPack.Msgs[0]
searchResultMsg, _ := (*tsMsg).(*msgstream.SearchResultMsg)
reqID := searchResultMsg.GetReqID()
log.Printf("ts msg: %v", tsMsg)
log.Printf("search result message: %v", searchResultMsg)
log.Printf("req id: %v", reqID)
_, ok = queryResultBuf[reqID]
if !ok {
queryResultBuf[reqID] = make([]*internalpb.SearchResult, 0)
@ -196,10 +191,13 @@ func (p *Proxy) queryResultLoop() {
if len(queryResultBuf[reqID]) == 4 {
// TODO: use the number of query node instead
t := p.taskSch.getTaskByReqID(reqID)
qt := t.(*QueryTask)
if qt != nil {
if t != nil {
qt := t.(*QueryTask)
log.Printf("address of query task: %p", qt)
qt.resultBuf <- queryResultBuf[reqID]
delete(queryResultBuf, reqID)
} else {
log.Printf("task with reqID %v is nil", reqID)
}
}
case <-p.proxyLoopCtx.Done():

View File

@ -16,7 +16,9 @@ import (
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/conf"
"github.com/zilliztech/milvus-distributed/internal/master"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
gparams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil"
@ -41,9 +43,8 @@ func startMaster(ctx context.Context) {
rootPath := conf.Config.Etcd.Rootpath
kvRootPath := path.Join(rootPath, "kv")
metaRootPath := path.Join(rootPath, "meta")
tsoRootPath := path.Join(rootPath, "timestamp")
svr, err := master.CreateServer(ctx, kvRootPath, metaRootPath, tsoRootPath, []string{etcdAddr})
svr, err := master.CreateServer(ctx, kvRootPath, metaRootPath, []string{etcdAddr})
masterServer = svr
if err != nil {
log.Print("create server failed", zap.Error(err))
@ -131,6 +132,7 @@ func TestProxy_CreateCollection(t *testing.T) {
if err != nil {
t.Error(err)
}
t.Logf("create collection response: %v", resp)
msg := "Create Collection " + strconv.Itoa(i) + " should succeed!"
assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_SUCCESS, msg)
}
@ -155,7 +157,7 @@ func TestProxy_HasCollection(t *testing.T) {
}
msg := "Has Collection " + strconv.Itoa(i) + " should succeed!"
assert.Equal(t, bool.Status.ErrorCode, commonpb.ErrorCode_SUCCESS, msg)
t.Logf("Has Collection %v: %v", i, bool.Value)
t.Logf("Has Collection %v: %v", i, bool)
}(&wg)
}
wg.Wait()
@ -259,15 +261,14 @@ func TestProxy_Insert(t *testing.T) {
wg.Wait()
}
/*
func TestProxy_Search(t *testing.T) {
var wg sync.WaitGroup
//buf := make(chan int, testNum)
buf := make(chan int, 1)
var sendWg sync.WaitGroup
var queryWg sync.WaitGroup
queryDone := make(chan int)
wg.Add(1)
func(group *sync.WaitGroup) {
defer wg.Done()
sendWg.Add(1)
go func(group *sync.WaitGroup) {
defer group.Done()
queryResultChannels := []string{"QueryResult"}
bufSize := 1024
queryResultMsgStream := msgstream.NewPulsarMsgStream(ctx, int64(bufSize))
@ -276,13 +277,16 @@ func TestProxy_Search(t *testing.T) {
assert.NotEqual(t, queryResultMsgStream, nil, "query result message stream should not be nil!")
queryResultMsgStream.CreatePulsarProducers(queryResultChannels)
i := 0
for {
select {
case <-ctx.Done():
t.Logf("query result message stream is closed ...")
queryResultMsgStream.Close()
case i := <- buf:
log.Printf("receive query request, reqID: %v", i)
return
case <-queryDone:
return
default:
for j := 0; j < 4; j++ {
searchResultMsg := &msgstream.SearchResultMsg{
BaseMsg: msgstream.BaseMsg{
@ -290,7 +294,7 @@ func TestProxy_Search(t *testing.T) {
},
SearchResult: internalpb.SearchResult{
MsgType: internalpb.MsgType_kSearchResult,
ReqID: int64(i),
ReqID: int64(i % testNum),
},
}
msgPack := &msgstream.MsgPack{
@ -298,12 +302,12 @@ func TestProxy_Search(t *testing.T) {
}
var tsMsg msgstream.TsMsg = searchResultMsg
msgPack.Msgs[0] = &tsMsg
log.Printf("proxy_test, produce message...")
queryResultMsgStream.Produce(msgPack)
}
i++
}
}
}(&wg)
}(&sendWg)
for i := 0; i < testNum; i++ {
i := i
@ -313,7 +317,7 @@ func TestProxy_Search(t *testing.T) {
CollectionName: collectionName,
}
wg.Add(1)
queryWg.Add(1)
go func(group *sync.WaitGroup) {
defer group.Done()
bool, err := proxyClient.HasCollection(ctx, &servicepb.CollectionName{CollectionName: collectionName})
@ -323,27 +327,47 @@ func TestProxy_Search(t *testing.T) {
msg := "Has Collection " + strconv.Itoa(i) + " should succeed!"
assert.Equal(t, bool.Status.ErrorCode, commonpb.ErrorCode_SUCCESS, msg)
if bool.Value {
log.Printf("Search: %v", collectionName)
fn := func() error {
buf <- i
resp, err := proxyClient.Search(ctx, req)
t.Logf("response of search collection %v: %v", i, resp)
return err
if !bool.Value {
req := &schemapb.CollectionSchema{
Name: collectionName,
Description: "no description",
AutoID: true,
Fields: make([]*schemapb.FieldSchema, 1),
}
err := Retry(10, time.Millisecond, fn)
fieldName := "Field" + strconv.FormatInt(int64(i), 10)
req.Fields[0] = &schemapb.FieldSchema{
Name: fieldName,
Description: "no description",
DataType: schemapb.DataType_INT32,
}
resp, err := proxyClient.CreateCollection(ctx, req)
if err != nil {
t.Error(err)
}
t.Logf("create collection response: %v", resp)
msg := "Create Collection " + strconv.Itoa(i) + " should succeed!"
assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_SUCCESS, msg)
}
}(&wg)
fn := func() error {
log.Printf("Search: %v", collectionName)
resp, err := proxyClient.Search(ctx, req)
t.Logf("response of search collection %v: %v", i, resp)
return err
}
err = fn()
if err != nil {
t.Error(err)
}
}(&queryWg)
}
wg.Wait()
t.Log("wait query to finish...")
queryWg.Wait()
t.Log("query finish ...")
queryDone <- 1
sendWg.Wait()
}
*/
func TestProxy_DropCollection(t *testing.T) {
var wg sync.WaitGroup
for i := 0; i < testNum; i++ {

View File

@ -289,40 +289,21 @@ func (qt *QueryTask) Execute() error {
}
func (qt *QueryTask) PostExecute() error {
return nil
}
func (qt *QueryTask) WaitToFinish() error {
for {
select {
case err := <-qt.done:
return err
case <-qt.ctx.Done():
log.Print("wait to finish failed, timeout!")
return errors.New("wait to finish failed, timeout")
}
}
}
func (qt *QueryTask) Notify(err error) {
defer func() {
qt.done <- err
}()
for {
select {
case <-qt.ctx.Done():
log.Print("wait to finish failed, timeout!")
return
case searchResults := <-qt.resultBuf:
rlen := len(searchResults) // query num
if rlen <= 0 {
qt.result = &servicepb.QueryResult{}
return
return nil
}
n := len(searchResults[0].Hits) // n
if n <= 0 {
qt.result = &servicepb.QueryResult{}
return
return nil
}
k := len(searchResults[0].Hits[0].IDs) // k
queryResult := &servicepb.QueryResult{
@ -361,6 +342,23 @@ func (qt *QueryTask) Notify(err error) {
qt.result = queryResult
}
}
//return nil
}
func (qt *QueryTask) WaitToFinish() error {
for {
select {
case err := <-qt.done:
return err
case <-qt.ctx.Done():
log.Print("wait to finish failed, timeout!")
return errors.New("wait to finish failed, timeout")
}
}
}
func (qt *QueryTask) Notify(err error) {
qt.done <- err
}
type HasCollectionTask struct {

View File

@ -34,6 +34,8 @@ type BaseTaskQueue struct {
maxTaskNum int64
utBufChan chan int // to block scheduler
sched *TaskScheduler
}
func (queue *BaseTaskQueue) utChan() <-chan int {
@ -156,6 +158,9 @@ func (queue *BaseTaskQueue) TaskDoneTest(ts Timestamp) bool {
func (queue *BaseTaskQueue) Enqueue(t task) error {
// TODO: set Ts, ReqId, ProxyId
ts, _ := queue.sched.tsoAllocator.AllocOne()
log.Printf("allocate timestamp: %v", ts)
t.SetTs(ts)
return queue.addUnissuedTask(t)
}
@ -178,39 +183,44 @@ func (queue *DdTaskQueue) Enqueue(t task) error {
defer queue.lock.Unlock()
// TODO: set Ts, ReqId, ProxyId
ts, _ := queue.sched.tsoAllocator.AllocOne()
t.SetTs(ts)
return queue.addUnissuedTask(t)
}
func NewDdTaskQueue() *DdTaskQueue {
func NewDdTaskQueue(sched *TaskScheduler) *DdTaskQueue {
return &DdTaskQueue{
BaseTaskQueue: BaseTaskQueue{
unissuedTasks: list.New(),
activeTasks: make(map[Timestamp]task),
maxTaskNum: 1024,
utBufChan: make(chan int, 1024),
sched: sched,
},
}
}
func NewDmTaskQueue() *DmTaskQueue {
func NewDmTaskQueue(sched *TaskScheduler) *DmTaskQueue {
return &DmTaskQueue{
BaseTaskQueue: BaseTaskQueue{
unissuedTasks: list.New(),
activeTasks: make(map[Timestamp]task),
maxTaskNum: 1024,
utBufChan: make(chan int, 1024),
sched: sched,
},
}
}
func NewDqTaskQueue() *DqTaskQueue {
func NewDqTaskQueue(sched *TaskScheduler) *DqTaskQueue {
return &DqTaskQueue{
BaseTaskQueue: BaseTaskQueue{
unissuedTasks: list.New(),
activeTasks: make(map[Timestamp]task),
maxTaskNum: 1024,
utBufChan: make(chan int, 1024),
sched: sched,
},
}
}
@ -233,14 +243,14 @@ func NewTaskScheduler(ctx context.Context,
tsoAllocator *allocator.TimestampAllocator) (*TaskScheduler, error) {
ctx1, cancel := context.WithCancel(ctx)
s := &TaskScheduler{
DdQueue: NewDdTaskQueue(),
DmQueue: NewDmTaskQueue(),
DqQueue: NewDqTaskQueue(),
idAllocator: idAllocator,
tsoAllocator: tsoAllocator,
ctx: ctx1,
cancel: cancel,
}
s.DdQueue = NewDdTaskQueue(s)
s.DmQueue = NewDmTaskQueue(s)
s.DqQueue = NewDqTaskQueue(s)
return s, nil
}
@ -276,19 +286,25 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) {
defer func() {
t.Notify(err)
log.Printf("notify with error: %v", err)
}()
if err != nil {
return
}
q.AddActiveTask(t)
defer q.PopActiveTask(t.EndTs())
log.Printf("query task add to active list ...")
defer func() {
q.PopActiveTask(t.EndTs())
log.Printf("pop from active list ...")
}()
err = t.Execute()
if err != nil {
log.Printf("execute definition task failed, error = %v", err)
return
}
log.Printf("scheduler task done ...")
err = t.PostExecute()
}
@ -330,9 +346,12 @@ func (sched *TaskScheduler) queryLoop() {
case <-sched.ctx.Done():
return
case <-sched.DqQueue.utChan():
log.Print("scheduler receive query request ...")
if !sched.DqQueue.utEmpty() {
t := sched.scheduleDqTask()
go sched.processTask(t, sched.DqQueue)
} else {
log.Print("query queue is empty ...")
}
}
}

View File

@ -23,7 +23,7 @@ import (
type container interface {
// collection
getCollectionNum() int
addCollection(collMeta *etcdpb.CollectionMeta, collMetaBlob string) error
addCollection(collMeta *etcdpb.CollectionMeta, colMetaBlob string) error
removeCollection(collectionID UniqueID) error
getCollectionByID(collectionID UniqueID) (*Collection, error)
getCollectionByName(collectionName string) (*Collection, error)
@ -59,11 +59,11 @@ func (container *colSegContainer) getCollectionNum() int {
return len(container.collections)
}
func (container *colSegContainer) addCollection(collMeta *etcdpb.CollectionMeta, collMetaBlob string) error {
func (container *colSegContainer) addCollection(collMeta *etcdpb.CollectionMeta, colMetaBlob string) error {
container.mu.Lock()
defer container.mu.Unlock()
var newCollection = newCollection(collMeta, collMetaBlob)
var newCollection = newCollection(collMeta, colMetaBlob)
container.collections = append(container.collections, newCollection)
return nil
@ -206,6 +206,7 @@ func (container *colSegContainer) getSegmentStatistics() *internalpb.QueryNodeSe
}
statisticData = append(statisticData, &stat)
segment.recentlyModified = false
}
return &internalpb.QueryNodeSegStats{

View File

@ -17,9 +17,6 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
)
const ctxTimeInMillisecond = 2000
const closeWithDeadline = true
// NOTE: start pulsar before test
func TestManipulationService_Start(t *testing.T) {
var ctx context.Context

View File

@ -56,6 +56,20 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg {
insertData.insertIDs[task.SegmentID] = append(insertData.insertIDs[task.SegmentID], task.RowIDs...)
insertData.insertTimestamps[task.SegmentID] = append(insertData.insertTimestamps[task.SegmentID], task.Timestamps...)
insertData.insertRecords[task.SegmentID] = append(insertData.insertRecords[task.SegmentID], task.RowData...)
// check if segment exists, if not, create this segment
if !(*iNode.container).hasSegment(task.SegmentID) {
collection, err := (*iNode.container).getCollectionByName(task.CollectionName)
if err != nil {
log.Println(err)
continue
}
err = (*iNode.container).addSegment(task.SegmentID, task.PartitionTag, collection.ID())
if err != nil {
log.Println(err)
continue
}
}
}
// 2. do preInsert

View File

@ -28,7 +28,7 @@ func (stNode *serviceTimeNode) Operate(in []*Msg) []*Msg {
}
// update service time
stNode.node.tSafe = serviceTimeMsg.timeRange.timestampMax
stNode.node.tSafe.setTSafe(serviceTimeMsg.timeRange.timestampMax)
return nil
}

View File

@ -363,7 +363,7 @@ func (mService *metaService) loadSegments() error {
//----------------------------------------------------------------------- Unmarshal and Marshal
func (mService *metaService) collectionUnmarshal(value string) *etcdpb.CollectionMeta {
col := etcdpb.CollectionMeta{}
err := proto.Unmarshal([]byte(value), &col)
err := proto.UnmarshalText(value, &col)
if err != nil {
log.Println(err)
return nil
@ -372,17 +372,17 @@ func (mService *metaService) collectionUnmarshal(value string) *etcdpb.Collectio
}
func (mService *metaService) collectionMarshal(col *etcdpb.CollectionMeta) string {
value, err := proto.Marshal(col)
if err != nil {
log.Println(err)
value := proto.MarshalTextString(col)
if value == "" {
log.Println("marshal collection failed")
return ""
}
return string(value)
return value
}
func (mService *metaService) segmentUnmarshal(value string) *etcdpb.SegmentMeta {
seg := etcdpb.SegmentMeta{}
err := proto.Unmarshal([]byte(value), &seg)
err := proto.UnmarshalText(value, &seg)
if err != nil {
log.Println(err)
return nil
@ -391,10 +391,10 @@ func (mService *metaService) segmentUnmarshal(value string) *etcdpb.SegmentMeta
}
func (mService *metaService) segmentMarshal(seg *etcdpb.SegmentMeta) string {
value, err := proto.Marshal(seg)
if err != nil {
log.Println(err)
value := proto.MarshalTextString(seg)
if value == "" {
log.Println("marshal segment failed")
return ""
}
return string(value)
return value
}

File diff suppressed because it is too large Load Diff

View File

@ -14,6 +14,7 @@ import "C"
import (
"context"
"sync"
)
type QueryNode struct {
@ -22,7 +23,7 @@ type QueryNode struct {
QueryNodeID uint64
pulsarURL string
tSafe Timestamp
tSafe tSafe
container *container
@ -32,6 +33,16 @@ type QueryNode struct {
statsService *statsService
}
type tSafe interface {
getTSafe() Timestamp
setTSafe(t Timestamp)
}
type serviceTime struct {
tSafeMu sync.Mutex
time Timestamp
}
func NewQueryNode(ctx context.Context, queryNodeID uint64, pulsarURL string) *QueryNode {
segmentsMap := make(map[int64]*Segment)
collections := make([]*Collection, 0)
@ -41,13 +52,15 @@ func NewQueryNode(ctx context.Context, queryNodeID uint64, pulsarURL string) *Qu
segments: segmentsMap,
}
var tSafe tSafe = &serviceTime{}
return &QueryNode{
ctx: ctx,
QueryNodeID: queryNodeID,
pulsarURL: pulsarURL,
tSafe: 0,
tSafe: tSafe,
container: &container,
@ -65,7 +78,7 @@ func (node *QueryNode) Start() {
node.statsService = newStatsService(node.ctx, node.container, node.pulsarURL)
go node.dataSyncService.start()
go node.searchService.start()
// go node.searchService.start()
go node.metaService.start()
node.statsService.start()
}
@ -73,3 +86,15 @@ func (node *QueryNode) Start() {
func (node *QueryNode) Close() {
// TODO: close services
}
func (st *serviceTime) getTSafe() Timestamp {
st.tSafeMu.Lock()
defer st.tSafeMu.Unlock()
return st.time
}
func (st *serviceTime) setTSafe(t Timestamp) {
st.tSafeMu.Lock()
st.time = t
st.tSafeMu.Unlock()
}

View File

@ -1,85 +1,37 @@
package reader
//import (
// "context"
// "testing"
//
// "github.com/stretchr/testify/assert"
// "github.com/zilliztech/milvus-distributed/internal/conf"
//)
//
//func TestQueryNode_CreateQueryNode(t *testing.T) {
// conf.LoadConfig("config.yaml")
// ctx, cancel := context.WithCancel(context.Background())
// defer cancel()
//
// node := CreateQueryNode(ctx, 0, 0, nil)
// assert.NotNil(t, node)
//}
//
//func TestQueryNode_NewQueryNode(t *testing.T) {
// conf.LoadConfig("config.yaml")
// ctx, cancel := context.WithCancel(context.Background())
// defer cancel()
//
// node := NewQueryNode(ctx, 0, 0)
// assert.NotNil(t, node)
//}
//
//func TestQueryNode_Close(t *testing.T) {
// conf.LoadConfig("config.yaml")
// ctx, cancel := context.WithCancel(context.Background())
// defer cancel()
//
// node := CreateQueryNode(ctx, 0, 0, nil)
// assert.NotNil(t, node)
//
// node.Close()
//}
//
//func TestQueryNode_QueryNodeDataInit(t *testing.T) {
// conf.LoadConfig("config.yaml")
// ctx, cancel := context.WithCancel(context.Background())
// defer cancel()
//
// node := CreateQueryNode(ctx, 0, 0, nil)
// assert.NotNil(t, node)
//
// node.QueryNodeDataInit()
//
// assert.NotNil(t, node.deletePreprocessData)
// assert.NotNil(t, node.insertData)
// assert.NotNil(t, node.deleteData)
//}
//
//func TestQueryNode_NewCollection(t *testing.T) {
// conf.LoadConfig("config.yaml")
// ctx, cancel := context.WithCancel(context.Background())
// defer cancel()
//
// node := CreateQueryNode(ctx, 0, 0, nil)
// assert.NotNil(t, node)
//
// var collection = node.newCollection(0, "collection0", "")
//
// assert.Equal(t, collection.CollectionName, "collection0")
// assert.Equal(t, len(node.Collections), 1)
//}
//
//func TestQueryNode_DeleteCollection(t *testing.T) {
// conf.LoadConfig("config.yaml")
// ctx, cancel := context.WithCancel(context.Background())
// defer cancel()
//
// node := CreateQueryNode(ctx, 0, 0, nil)
// assert.NotNil(t, node)
//
// var collection = node.newCollection(0, "collection0", "")
//
// assert.Equal(t, collection.CollectionName, "collection0")
// assert.Equal(t, len(node.Collections), 1)
//
// node.deleteCollection(collection)
//
// assert.Equal(t, len(node.Collections), 0)
//}
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
gParams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil"
)
const ctxTimeInMillisecond = 2000
const closeWithDeadline = true
// NOTE: start pulsar and etcd before test
func TestQueryNode_start(t *testing.T) {
err := gParams.GParams.LoadYaml("config.yaml")
assert.NoError(t, err)
var ctx context.Context
if closeWithDeadline {
var cancel context.CancelFunc
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, cancel = context.WithDeadline(context.Background(), d)
defer cancel()
} else {
ctx = context.Background()
}
pulsarAddr, _ := gParams.GParams.Load("pulsar.address")
pulsarPort, _ := gParams.GParams.Load("pulsar.port")
pulsarAddr += ":" + pulsarPort
pulsarAddr = "pulsar://" + pulsarAddr
node := NewQueryNode(ctx, 0, pulsarAddr)
node.Start()
}

View File

@ -1,96 +0,0 @@
package reader
//import (
// "context"
// "strconv"
// "sync"
// "testing"
// "time"
//
// "github.com/stretchr/testify/assert"
// "github.com/zilliztech/milvus-distributed/internal/conf"
// "github.com/zilliztech/milvus-distributed/internal/msgclient"
//)
//
//const ctxTimeInMillisecond = 10
//
//// NOTE: start pulsar and etcd before test
//func TestReader_startQueryNode(t *testing.T) {
// conf.LoadConfig("config.yaml")
//
// d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
// ctx, cancel := context.WithDeadline(context.Background(), d)
// defer cancel()
//
// pulsarAddr := "pulsar://"
// pulsarAddr += conf.Config.Pulsar.Address
// pulsarAddr += ":"
// pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10)
//
// StartQueryNode(ctx, pulsarAddr)
//
// // To make sure to get here
// assert.Equal(t, 0, 0)
//}
//
//// NOTE: start pulsar before test
//func TestReader_RunInsertDelete(t *testing.T) {
// conf.LoadConfig("config.yaml")
//
// d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
// ctx, cancel := context.WithDeadline(context.Background(), d)
// defer cancel()
//
// mc := msgclient.ReaderMessageClient{}
// pulsarAddr := "pulsar://"
// pulsarAddr += conf.Config.Pulsar.Address
// pulsarAddr += ":"
// pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10)
//
// mc.InitClient(ctx, pulsarAddr)
// mc.ReceiveMessage()
//
// node := CreateQueryNode(ctx, 0, 0, &mc)
//
// wg := sync.WaitGroup{}
//
// wg.Add(1)
// go node.RunInsertDelete(&wg)
// wg.Wait()
//
// node.Close()
//
// // To make sure to get here
// assert.Equal(t, 0, 0)
//}
//
//// NOTE: start pulsar before test
//func TestReader_RunSearch(t *testing.T) {
// conf.LoadConfig("config.yaml")
//
// d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
// ctx, cancel := context.WithDeadline(context.Background(), d)
// defer cancel()
//
// mc := msgclient.ReaderMessageClient{}
// pulsarAddr := "pulsar://"
// pulsarAddr += conf.Config.Pulsar.Address
// pulsarAddr += ":"
// pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10)
//
// mc.InitClient(ctx, pulsarAddr)
// mc.ReceiveMessage()
//
// node := CreateQueryNode(ctx, 0, 0, &mc)
//
// wg := sync.WaitGroup{}
//
// wg.Add(1)
// go node.RunSearch(&wg)
// wg.Wait()
//
// node.Close()
//
// // To make sure to get here
// assert.Equal(t, 0, 0)
//}

View File

@ -151,6 +151,7 @@ func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps
return errors.New("Insert failed, error code = " + strconv.Itoa(int(status)))
}
s.recentlyModified = true
return nil
}

View File

@ -3,6 +3,7 @@ package reader
import (
"context"
"fmt"
"log"
"strconv"
"time"
@ -13,35 +14,55 @@ import (
type statsService struct {
ctx context.Context
msgStream *msgstream.PulsarMsgStream
pulsarURL string
msgStream *msgstream.MsgStream
container *container
}
func newStatsService(ctx context.Context, container *container, pulsarAddress string) *statsService {
// TODO: add pulsar message stream init
func newStatsService(ctx context.Context, container *container, pulsarURL string) *statsService {
return &statsService{
ctx: ctx,
pulsarURL: pulsarURL,
msgStream: nil,
container: container,
}
}
func (sService *statsService) start() {
sleepMillisecondTime := 1000
const (
receiveBufSize = 1024
sleepMillisecondTime = 1000
)
// start pulsar
producerChannels := []string{"statistic"}
statsStream := msgstream.NewPulsarMsgStream(sService.ctx, receiveBufSize)
statsStream.SetPulsarCient(sService.pulsarURL)
statsStream.CreatePulsarProducers(producerChannels)
var statsMsgStream msgstream.MsgStream = statsStream
sService.msgStream = &statsMsgStream
(*sService.msgStream).Start()
// start service
fmt.Println("do segments statistic in ", strconv.Itoa(sleepMillisecondTime), "ms")
for {
select {
case <-sService.ctx.Done():
return
default:
time.Sleep(time.Duration(sleepMillisecondTime) * time.Millisecond)
case <-time.After(sleepMillisecondTime * time.Millisecond):
sService.sendSegmentStatistic()
}
}
}
func (sService *statsService) sendSegmentStatistic() {
var statisticData = (*sService.container).getSegmentStatistics()
statisticData := (*sService.container).getSegmentStatistics()
// fmt.Println("Publish segment statistic")
// fmt.Println(statisticData)
@ -49,5 +70,18 @@ func (sService *statsService) sendSegmentStatistic() {
}
func (sService *statsService) publicStatistic(statistic *internalpb.QueryNodeSegStats) {
// TODO: publish statistic
var msg msgstream.TsMsg = &msgstream.QueryNodeSegStatsMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: []int32{0},
},
QueryNodeSegStats: *statistic,
}
var msgPack = msgstream.MsgPack{
Msgs: []*msgstream.TsMsg{&msg},
}
err := (*sService.msgStream).Produce(&msgPack)
if err != nil {
log.Println(err)
}
}

View File

@ -1,70 +1,187 @@
package reader
//import (
// "context"
// "strconv"
// "testing"
// "time"
//
// "github.com/zilliztech/milvus-distributed/internal/conf"
// "github.com/zilliztech/milvus-distributed/internal/msgclient"
//)
//
//// NOTE: start pulsar before test
//func TestSegmentManagement_SegmentStatistic(t *testing.T) {
// conf.LoadConfig("config.yaml")
//
// ctx, cancel := context.WithCancel(context.Background())
// defer cancel()
//
// mc := msgclient.ReaderMessageClient{}
// pulsarAddr := "pulsar://"
// pulsarAddr += conf.Config.Pulsar.Address
// pulsarAddr += ":"
// pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10)
//
// mc.InitClient(ctx, pulsarAddr)
// mc.ReceiveMessage()
//
// node := CreateQueryNode(ctx, 0, 0, &mc)
//
// // Construct node, collection, partition and segment
// var collection = node.newCollection(0, "collection0", "")
// var partition = collection.newPartition("partition0")
// var segment = partition.newSegment(0)
// node.SegmentsMap[0] = segment
//
// node.SegmentStatistic(1000)
//
// node.Close()
//}
//
//// NOTE: start pulsar before test
//func TestSegmentManagement_SegmentStatisticService(t *testing.T) {
// conf.LoadConfig("config.yaml")
//
// d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
// ctx, cancel := context.WithDeadline(context.Background(), d)
// defer cancel()
//
// mc := msgclient.ReaderMessageClient{}
// pulsarAddr := "pulsar://"
// pulsarAddr += conf.Config.Pulsar.Address
// pulsarAddr += ":"
// pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10)
//
// mc.InitClient(ctx, pulsarAddr)
// mc.ReceiveMessage()
//
// node := CreateQueryNode(ctx, 0, 0, &mc)
//
// // Construct node, collection, partition and segment
// var collection = node.newCollection(0, "collection0", "")
// var partition = collection.newPartition("partition0")
// var segment = partition.newSegment(0)
// node.SegmentsMap[0] = segment
//
// node.SegmentStatisticService()
//
// node.Close()
//}
import (
"context"
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
)
// NOTE: start pulsar before test
func TestStatsService_start(t *testing.T) {
var ctx context.Context
if closeWithDeadline {
var cancel context.CancelFunc
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, cancel = context.WithDeadline(context.Background(), d)
defer cancel()
} else {
ctx = context.Background()
}
// init query node
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
// init meta
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "16",
},
},
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "1",
},
},
}
schema := schemapb.CollectionSchema{
Name: collectionName,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
}
collectionMeta := etcdpb.CollectionMeta{
ID: UniqueID(0),
Schema: &schema,
CreateTime: Timestamp(0),
SegmentIDs: []UniqueID{0},
PartitionTags: []string{"default"},
}
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, "collection0")
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.container).getCollectionNum(), 1)
err = (*node.container).addPartition(collection.ID(), collectionMeta.PartitionTags[0])
assert.NoError(t, err)
segmentID := UniqueID(0)
err = (*node.container).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0))
assert.NoError(t, err)
// start stats service
node.statsService = newStatsService(node.ctx, node.container, node.pulsarURL)
node.statsService.start()
}
// NOTE: start pulsar before test
func TestSegmentManagement_SegmentStatisticService(t *testing.T) {
var ctx context.Context
if closeWithDeadline {
var cancel context.CancelFunc
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, cancel = context.WithDeadline(context.Background(), d)
defer cancel()
} else {
ctx = context.Background()
}
// init query node
pulsarURL := "pulsar://localhost:6650"
node := NewQueryNode(ctx, 0, pulsarURL)
// init meta
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "16",
},
},
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "1",
},
},
}
schema := schemapb.CollectionSchema{
Name: collectionName,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
}
collectionMeta := etcdpb.CollectionMeta{
ID: UniqueID(0),
Schema: &schema,
CreateTime: Timestamp(0),
SegmentIDs: []UniqueID{0},
PartitionTags: []string{"default"},
}
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.container).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
collection, err := (*node.container).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, "collection0")
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.container).getCollectionNum(), 1)
err = (*node.container).addPartition(collection.ID(), collectionMeta.PartitionTags[0])
assert.NoError(t, err)
segmentID := UniqueID(0)
err = (*node.container).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0))
assert.NoError(t, err)
const receiveBufSize = 1024
// start pulsar
producerChannels := []string{"statistic"}
statsStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
statsStream.SetPulsarCient(pulsarURL)
statsStream.CreatePulsarProducers(producerChannels)
var statsMsgStream msgstream.MsgStream = statsStream
node.statsService = newStatsService(node.ctx, node.container, node.pulsarURL)
node.statsService.msgStream = &statsMsgStream
(*node.statsService.msgStream).Start()
// send stats
node.statsService.sendSegmentStatistic()
}

View File

@ -1,202 +0,0 @@
package reader
//import (
// "context"
// "strconv"
// "testing"
// "time"
//
// "github.com/zilliztech/milvus-distributed/internal/conf"
// "github.com/zilliztech/milvus-distributed/internal/msgclient"
// msgPb "github.com/zilliztech/milvus-distributed/internal/proto/message"
//
// "github.com/stretchr/testify/assert"
//)
//
//// NOTE: start pulsar before test
//func TestUtilFunctions_GetKey2Segments(t *testing.T) {
// conf.LoadConfig("config.yaml")
//
// d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
// ctx, cancel := context.WithDeadline(context.Background(), d)
// defer cancel()
//
// mc := msgclient.ReaderMessageClient{}
// pulsarAddr := "pulsar://"
// pulsarAddr += conf.Config.Pulsar.Address
// pulsarAddr += ":"
// pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10)
//
// mc.InitClient(ctx, pulsarAddr)
// mc.ReceiveMessage()
//
// node := CreateQueryNode(ctx, 0, 0, &mc)
//
// node.messageClient.PrepareKey2SegmentMsg()
//
// const msgLength = 10
//
// for i := 0; i < msgLength; i++ {
// key2SegMsg := msgPb.Key2SegMsg{
// Uid: int64(i),
// Timestamp: uint64(i + 1000),
// SegmentID: []int64{int64(i)},
// }
// node.messageClient.Key2SegMsg = append(node.messageClient.Key2SegMsg, &key2SegMsg)
// }
//
// entityIDs, timestamps, segmentIDs := node.GetKey2Segments()
//
// assert.Equal(t, len(*entityIDs), msgLength)
// assert.Equal(t, len(*timestamps), msgLength)
// assert.Equal(t, len(*segmentIDs), msgLength)
//
// node.Close()
//}
//
//func TestUtilFunctions_GetCollectionByID(t *testing.T) {
// ctx := context.Background()
//
// node := NewQueryNode(ctx, 0, 0)
// var collection = node.newCollection(0, "collection0", "")
// var partition = collection.newPartition("partition0")
// var segment = partition.newSegment(0)
//
// node.SegmentsMap[int64(0)] = segment
//
// assert.Equal(t, collection.CollectionName, "collection0")
// assert.Equal(t, partition.partitionTag, "partition0")
// assert.Equal(t, segment.SegmentID, int64(0))
// assert.Equal(t, len(node.SegmentsMap), 1)
//
// c := node.getCollectionByID(int64(0))
// assert.Equal(t, c.CollectionName, "collection0")
//
// partition.deleteSegment(node, segment)
// collection.deletePartition(node, partition)
// node.deleteCollection(collection)
//
// assert.Equal(t, len(node.Collections), 0)
// assert.Equal(t, len(node.SegmentsMap), 0)
//
// node.Close()
//}
//
//func TestUtilFunctions_GetCollectionByCollectionName(t *testing.T) {
// ctx := context.Background()
// // 1. Construct node, and collections
// node := NewQueryNode(ctx, 0, 0)
// var _ = node.newCollection(0, "collection0", "")
//
// // 2. Get collection by collectionName
// var c0, err = node.getCollectionByCollectionName("collection0")
// assert.NoError(t, err)
// assert.Equal(t, c0.CollectionName, "collection0")
//
// c0 = node.getCollectionByID(0)
// assert.NotNil(t, c0)
// assert.Equal(t, c0.CollectionID, uint64(0))
//
// node.Close()
//}
//
//func TestUtilFunctions_GetSegmentBySegmentID(t *testing.T) {
// ctx := context.Background()
//
// // 1. Construct node, collection, partition and segment
// node := NewQueryNode(ctx, 0, 0)
// var collection = node.newCollection(0, "collection0", "")
// var partition = collection.newPartition("partition0")
// var segment = partition.newSegment(0)
// node.SegmentsMap[0] = segment
//
// // 2. Get segment by segment id
// var s0, err = node.getSegmentBySegmentID(0)
// assert.NoError(t, err)
// assert.Equal(t, s0.SegmentID, int64(0))
//
// node.Close()
//}
//
//func TestUtilFunctions_FoundSegmentBySegmentID(t *testing.T) {
// ctx := context.Background()
//
// node := NewQueryNode(ctx, 0, 0)
// var collection = node.newCollection(0, "collection0", "")
// var partition = collection.newPartition("partition0")
// var segment = partition.newSegment(0)
//
// node.SegmentsMap[int64(0)] = segment
//
// assert.Equal(t, collection.CollectionName, "collection0")
// assert.Equal(t, partition.partitionTag, "partition0")
// assert.Equal(t, segment.SegmentID, int64(0))
// assert.Equal(t, len(node.SegmentsMap), 1)
//
// b1 := node.foundSegmentBySegmentID(int64(0))
// assert.Equal(t, b1, true)
//
// b2 := node.foundSegmentBySegmentID(int64(1))
// assert.Equal(t, b2, false)
//
// partition.deleteSegment(node, segment)
// collection.deletePartition(node, partition)
// node.deleteCollection(collection)
//
// assert.Equal(t, len(node.Collections), 0)
// assert.Equal(t, len(node.SegmentsMap), 0)
//
// node.Close()
//}
//
//func TestUtilFunctions_GetPartitionByName(t *testing.T) {
// ctx := context.Background()
//
// node := NewQueryNode(ctx, 0, 0)
// var collection = node.newCollection(0, "collection0", "")
// var partition = collection.newPartition("partition0")
//
// var p = collection.getPartitionByName("partition0")
// assert.Equal(t, p.partitionTag, "partition0")
//
// collection.deletePartition(node, partition)
// node.deleteCollection(collection)
//
// node.Close()
//}
//
//// NOTE: start pulsar before test
//func TestUtilFunctions_PrepareBatchMsg(t *testing.T) {
// conf.LoadConfig("config.yaml")
//
// ctx, cancel := context.WithCancel(context.Background())
// defer cancel()
//
// mc := msgclient.ReaderMessageClient{}
// pulsarAddr := "pulsar://"
// pulsarAddr += conf.Config.Pulsar.Address
// pulsarAddr += ":"
// pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10)
//
// mc.InitClient(ctx, pulsarAddr)
// mc.ReceiveMessage()
//
// node := CreateQueryNode(ctx, 0, 0, &mc)
//
// node.PrepareBatchMsg()
// node.Close()
//}
//
//func TestUtilFunctions_QueryJson2Info(t *testing.T) {
// ctx := context.Background()
// node := NewQueryNode(ctx, 0, 0)
//
// var queryJSON = "{\"field_name\":\"age\",\"num_queries\":1,\"topK\":10}"
// info := node.queryJSON2Info(&queryJSON)
//
// assert.Equal(t, info.FieldName, "age")
// assert.Equal(t, info.NumQueries, int64(1))
// assert.Equal(t, info.TopK, 10)
//
// node.Close()
//}

View File

@ -1,12 +1,10 @@
package tsoutil
import (
"fmt"
"path"
"time"
"github.com/zilliztech/milvus-distributed/internal/kv"
gparams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil"
"go.etcd.io/etcd/clientv3"
)
@ -27,25 +25,10 @@ func ParseTS(ts uint64) (time.Time, uint64) {
return physicalTime, logical
}
func NewTSOKVBase(subPath string) *kv.EtcdKV {
etcdAddr, err := gparams.GParams.Load("etcd.address")
if err != nil {
panic(err)
}
etcdPort, err := gparams.GParams.Load("etcd.port")
if err != nil {
panic(err)
}
etcdAddr = etcdAddr + ":" + etcdPort
fmt.Println("etcdAddr ::: ", etcdAddr)
func NewTSOKVBase(etcdAddr []string, tsoRoot, subPath string) *kv.EtcdKV {
client, _ := clientv3.New(clientv3.Config{
Endpoints: []string{etcdAddr},
Endpoints: etcdAddr,
DialTimeout: 5 * time.Second,
})
etcdRootPath, err := gparams.GParams.Load("etcd.rootpath")
if err != nil {
panic(err)
}
return kv.NewEtcdKV(client, path.Join(etcdRootPath, subPath))
return kv.NewEtcdKV(client, path.Join(tsoRoot, subPath))
}