mirror of https://github.com/milvus-io/milvus.git
Create a goroutine to receive msg for every consumer
Signed-off-by: sunby <bingyi.sun@zilliz.com>pull/4973/head^2
parent
861576f77a
commit
b07a371f25
4
go.mod
4
go.mod
|
@ -3,7 +3,8 @@ module github.com/zilliztech/milvus-distributed
|
|||
go 1.15
|
||||
|
||||
require (
|
||||
github.com/apache/pulsar-client-go v0.3.0
|
||||
github.com/apache/pulsar-client-go v0.1.1
|
||||
github.com/apache/thrift/lib/go/thrift v0.0.0-20210120171102-e27e82c46ba4
|
||||
github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c // indirect
|
||||
github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect
|
||||
github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 // indirect
|
||||
|
@ -11,6 +12,7 @@ require (
|
|||
github.com/go-basic/ipv4 v1.0.0
|
||||
github.com/gogo/protobuf v1.3.1
|
||||
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
|
||||
github.com/golang/mock v1.3.1
|
||||
github.com/golang/protobuf v1.4.2
|
||||
github.com/google/btree v1.0.0
|
||||
github.com/klauspost/compress v1.10.11 // indirect
|
||||
|
|
10
go.sum
10
go.sum
|
@ -22,10 +22,15 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
|
|||
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
||||
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 h1:Hs82Z41s6SdL1CELW+XaDYmOH4hkBN4/N9og/AsOv7E=
|
||||
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
||||
github.com/apache/pulsar-client-go v0.1.1 h1:v/kU+2ZCC6yFIcbZrFtWa9/nvVzVr18L+xYJUvZSxEQ=
|
||||
github.com/apache/pulsar-client-go v0.1.1/go.mod h1:mlxC65KL1BLhGO2bnT9zWMttVzR2czVPb27D477YpyU=
|
||||
github.com/apache/pulsar-client-go v0.3.0 h1:rNhJ/ENwoEfZPHHwUHNxPBTNqNQE2LQEm7DXu043giM=
|
||||
github.com/apache/pulsar-client-go v0.3.0/go.mod h1:9eSgOadVhCfb2DfWtS1SCYaYIMk9VDOZztr4u3FO8cQ=
|
||||
github.com/apache/pulsar-client-go/oauth2 v0.0.0-20200715083626-b9f8c5cedefb h1:E1P0FudxDdj2RhbveZC9i3PwukLCA/4XQSkBS/dw6/I=
|
||||
github.com/apache/pulsar-client-go/oauth2 v0.0.0-20200715083626-b9f8c5cedefb/go.mod h1:0UtvvETGDdvXNDCHa8ZQpxl+w3HbdFtfYZvDHLgWGTY=
|
||||
github.com/apache/thrift v0.14.0 h1:vqZ2DP42i8th2OsgCcYZkirtbzvpZEFx53LiWDJXIAs=
|
||||
github.com/apache/thrift/lib/go/thrift v0.0.0-20210120171102-e27e82c46ba4 h1:orNYqmQGnSjgOauLWjHEp9/qIDT98xv/0Aa4Zet3/Y8=
|
||||
github.com/apache/thrift/lib/go/thrift v0.0.0-20210120171102-e27e82c46ba4/go.mod h1:V/LzksIyqd3KZuQ2SunvReTG/UkArhII1dAWY5U1sCE=
|
||||
github.com/ardielle/ardielle-go v1.5.2 h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4=
|
||||
github.com/ardielle/ardielle-go v1.5.2/go.mod h1:I4hy1n795cUhaVt/ojz83SNVCYIGsAFAONtv2Dr7HUI=
|
||||
github.com/ardielle/ardielle-tools v1.5.4/go.mod h1:oZN+JRMnqGiIhrzkRN9l26Cej9dEx4jeNG6A+AdkShk=
|
||||
|
@ -92,6 +97,7 @@ github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4/go.mod h1:5tD+ne
|
|||
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
|
||||
github.com/frankban/quicktest v1.10.2 h1:19ARM85nVi4xH7xPXuc5eM/udya5ieh7b/Sv+d844Tk=
|
||||
github.com/frankban/quicktest v1.10.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s=
|
||||
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
|
||||
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
|
||||
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
|
||||
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
|
||||
|
@ -329,6 +335,7 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT
|
|||
github.com/prometheus/procfs v0.1.3 h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFBS8=
|
||||
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
|
||||
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
|
||||
github.com/protocolbuffers/protobuf v3.15.1+incompatible h1:HfHKbdokPc6vkBuYYXW/PTpOIo8BICHl1vfE8dT5jGo=
|
||||
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
|
||||
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
||||
github.com/rs/xid v1.2.1 h1:mhH9Nq+C1fY2l1XIpgxIiUOfNpRBYH1kKcr+qfKgjRc=
|
||||
|
@ -336,6 +343,7 @@ github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
|
|||
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
|
||||
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
|
||||
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
|
||||
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
|
||||
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
|
||||
github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I=
|
||||
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
|
||||
|
@ -379,6 +387,8 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1
|
|||
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ=
|
||||
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
|
||||
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
|
||||
github.com/valyala/gozstd v1.7.0 h1:Ljh5c9zboqLhwTI33al32R72iCZfn0mCbVGcFWbGwRQ=
|
||||
github.com/valyala/gozstd v1.7.0/go.mod h1:y5Ew47GLlP37EkTB+B4s7r6A5rdaeB7ftbl9zoYiIPQ=
|
||||
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
|
||||
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
|
||||
github.com/yahoo/athenz v1.8.55/go.mod h1:G7LLFUH7Z/r4QAB7FfudfuA7Am/eCzO1GlzBhDL6Kv0=
|
||||
|
|
|
@ -4,8 +4,6 @@ import (
|
|||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/apache/pulsar-client-go/pulsar/log"
|
||||
|
||||
"go.uber.org/zap/zapcore"
|
||||
|
||||
etcd "go.etcd.io/etcd/clientv3"
|
||||
|
@ -79,51 +77,6 @@ func (w *zapWrapper) V(l int) bool {
|
|||
return w.logger.Core().Enabled(zapcore.Level(zapLevel))
|
||||
}
|
||||
|
||||
func (w *zapWrapper) SubLogger(fields log.Fields) log.Logger {
|
||||
return w.WithFields(fields).(log.Logger)
|
||||
}
|
||||
|
||||
func (w *zapWrapper) WithFields(fields log.Fields) log.Entry {
|
||||
if len(fields) == 0 {
|
||||
return w
|
||||
}
|
||||
kv := make([]interface{}, 0, 2*len(fields))
|
||||
for k, v := range fields {
|
||||
kv = append(kv, k, v)
|
||||
}
|
||||
return &zapWrapper{
|
||||
logger: w.logger.Sugar().With(kv...).Desugar(),
|
||||
}
|
||||
}
|
||||
|
||||
func (w *zapWrapper) WithField(name string, value interface{}) log.Entry {
|
||||
return &zapWrapper{
|
||||
logger: w.logger.Sugar().With(name, value).Desugar(),
|
||||
}
|
||||
}
|
||||
|
||||
func (w *zapWrapper) WithError(err error) log.Entry {
|
||||
return &zapWrapper{
|
||||
logger: w.logger.Sugar().With("error", err).Desugar(),
|
||||
}
|
||||
}
|
||||
|
||||
func (w *zapWrapper) Debug(args ...interface{}) {
|
||||
w.logger.WithOptions(zap.AddCallerSkip(1)).Sugar().Debug(args...)
|
||||
}
|
||||
|
||||
func (w *zapWrapper) Warn(args ...interface{}) {
|
||||
w.logger.WithOptions(zap.AddCallerSkip(1)).Sugar().Warn(args...)
|
||||
}
|
||||
|
||||
func (w *zapWrapper) Debugf(format string, args ...interface{}) {
|
||||
w.logger.WithOptions(zap.AddCallerSkip(1)).Sugar().Debugf(format, args...)
|
||||
}
|
||||
|
||||
func (w *zapWrapper) Warnf(format string, args ...interface{}) {
|
||||
w.logger.WithOptions(zap.AddCallerSkip(1)).Sugar().Warnf(format, args...)
|
||||
}
|
||||
|
||||
var once sync.Once
|
||||
var _globalZapWrapper atomic.Value
|
||||
|
||||
|
|
|
@ -90,6 +90,7 @@ func newPulsarMsgStream(ctx context.Context,
|
|||
streamCancel: streamCancel,
|
||||
consumerReflects: consumerReflects,
|
||||
consumerLock: &sync.Mutex{},
|
||||
wait: &sync.WaitGroup{},
|
||||
}
|
||||
|
||||
return stream, nil
|
||||
|
@ -136,14 +137,14 @@ func (ms *PulsarMsgStream) AsConsumer(channels []string,
|
|||
return errors.New("pulsar is not ready, consumer is nil")
|
||||
}
|
||||
|
||||
ms.consumerLock.Lock()
|
||||
ms.consumers = append(ms.consumers, pc)
|
||||
ms.consumerChannels = append(ms.consumerChannels, channels[i])
|
||||
ms.consumerReflects = append(ms.consumerReflects, reflect.SelectCase{
|
||||
Dir: reflect.SelectRecv,
|
||||
Chan: reflect.ValueOf(pc.Chan()),
|
||||
})
|
||||
ms.consumerLock.Unlock()
|
||||
ms.wait.Add(1)
|
||||
go ms.receiveMsg(pc)
|
||||
return nil
|
||||
}
|
||||
err := util.Retry(10, time.Millisecond*200, fn)
|
||||
|
@ -159,15 +160,11 @@ func (ms *PulsarMsgStream) SetRepackFunc(repackFunc RepackFunc) {
|
|||
}
|
||||
|
||||
func (ms *PulsarMsgStream) Start() {
|
||||
ms.wait = &sync.WaitGroup{}
|
||||
if ms.consumers != nil {
|
||||
ms.wait.Add(1)
|
||||
go ms.bufMsgPackToChannel()
|
||||
}
|
||||
}
|
||||
|
||||
func (ms *PulsarMsgStream) Close() {
|
||||
ms.streamCancel()
|
||||
ms.wait.Wait()
|
||||
|
||||
for _, producer := range ms.producers {
|
||||
if producer != nil {
|
||||
|
@ -321,9 +318,42 @@ func (ms *PulsarMsgStream) Consume() *MsgPack {
|
|||
}
|
||||
}
|
||||
|
||||
func (ms *PulsarMsgStream) bufMsgPackToChannel() {
|
||||
func (ms *PulsarMsgStream) receiveMsg(consumer Consumer) {
|
||||
defer ms.wait.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ms.ctx.Done():
|
||||
return
|
||||
case pulsarMsg, ok := <-consumer.Chan():
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
headerMsg := commonpb.MsgHeader{}
|
||||
err := proto.Unmarshal(pulsarMsg.Payload(), &headerMsg)
|
||||
if err != nil {
|
||||
log.Error("Failed to unmarshal message header", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
tsMsg, err := ms.unmarshal.Unmarshal(pulsarMsg.Payload(), headerMsg.Base.MsgType)
|
||||
if err != nil {
|
||||
log.Error("Failed to unmarshal tsMsg", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
tsMsg.SetPosition(&msgstream.MsgPosition{
|
||||
ChannelName: filepath.Base(pulsarMsg.Topic()),
|
||||
MsgID: typeutil.PulsarMsgIDToString(pulsarMsg.ID()),
|
||||
})
|
||||
|
||||
msgPack := MsgPack{Msgs: []TsMsg{tsMsg}}
|
||||
ms.receiveBuf <- &msgPack
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ms *PulsarMsgStream) bufMsgPackToChannel() {
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ms.ctx.Done():
|
||||
|
@ -500,7 +530,6 @@ func (ms *PulsarTtMsgStream) AsConsumer(channels []string,
|
|||
}
|
||||
|
||||
func (ms *PulsarTtMsgStream) Start() {
|
||||
ms.wait = &sync.WaitGroup{}
|
||||
if ms.consumers != nil {
|
||||
ms.wait.Add(1)
|
||||
go ms.bufMsgPackToChannel()
|
||||
|
|
|
@ -7,6 +7,8 @@ import (
|
|||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/msgstream"
|
||||
|
@ -237,9 +239,10 @@ func receiveMsg(outputStream msgstream.MsgStream, msgCount int) {
|
|||
|
||||
func TestStream_PulsarMsgStream_Insert(t *testing.T) {
|
||||
pulsarAddress, _ := Params.Load("_PulsarAddress")
|
||||
producerChannels := []string{"insert1", "insert2"}
|
||||
consumerChannels := []string{"insert1", "insert2"}
|
||||
consumerSubName := "subInsert"
|
||||
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
|
||||
producerChannels := []string{c1, c2}
|
||||
consumerChannels := []string{c1, c2}
|
||||
consumerSubName := funcutil.RandomString(8)
|
||||
|
||||
msgPack := msgstream.MsgPack{}
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kInsert, 1, 1))
|
||||
|
@ -259,10 +262,10 @@ func TestStream_PulsarMsgStream_Insert(t *testing.T) {
|
|||
|
||||
func TestStream_PulsarMsgStream_Delete(t *testing.T) {
|
||||
pulsarAddress, _ := Params.Load("_PulsarAddress")
|
||||
producerChannels := []string{"delete"}
|
||||
consumerChannels := []string{"delete"}
|
||||
consumerSubName := "subDelete"
|
||||
|
||||
c := funcutil.RandomString(8)
|
||||
producerChannels := []string{c}
|
||||
consumerChannels := []string{c}
|
||||
consumerSubName := funcutil.RandomString(8)
|
||||
msgPack := msgstream.MsgPack{}
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kDelete, 1, 1))
|
||||
//msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kDelete, 3, 3))
|
||||
|
@ -279,9 +282,10 @@ func TestStream_PulsarMsgStream_Delete(t *testing.T) {
|
|||
|
||||
func TestStream_PulsarMsgStream_Search(t *testing.T) {
|
||||
pulsarAddress, _ := Params.Load("_PulsarAddress")
|
||||
producerChannels := []string{"search"}
|
||||
consumerChannels := []string{"search"}
|
||||
consumerSubName := "subSearch"
|
||||
c := funcutil.RandomString(8)
|
||||
producerChannels := []string{c}
|
||||
consumerChannels := []string{c}
|
||||
consumerSubName := funcutil.RandomString(8)
|
||||
|
||||
msgPack := msgstream.MsgPack{}
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kSearch, 1, 1))
|
||||
|
@ -299,10 +303,10 @@ func TestStream_PulsarMsgStream_Search(t *testing.T) {
|
|||
|
||||
func TestStream_PulsarMsgStream_SearchResult(t *testing.T) {
|
||||
pulsarAddress, _ := Params.Load("_PulsarAddress")
|
||||
producerChannels := []string{"searchResult"}
|
||||
consumerChannels := []string{"searchResult"}
|
||||
consumerSubName := "subSearchResult"
|
||||
|
||||
c := funcutil.RandomString(8)
|
||||
producerChannels := []string{c}
|
||||
consumerChannels := []string{c}
|
||||
consumerSubName := funcutil.RandomString(8)
|
||||
msgPack := msgstream.MsgPack{}
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kSearchResult, 1, 1))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kSearchResult, 3, 3))
|
||||
|
@ -319,10 +323,10 @@ func TestStream_PulsarMsgStream_SearchResult(t *testing.T) {
|
|||
|
||||
func TestStream_PulsarMsgStream_TimeTick(t *testing.T) {
|
||||
pulsarAddress, _ := Params.Load("_PulsarAddress")
|
||||
producerChannels := []string{"timeTick"}
|
||||
consumerChannels := []string{"timeTick"}
|
||||
consumerSubName := "subTimeTick"
|
||||
|
||||
c := funcutil.RandomString(8)
|
||||
producerChannels := []string{c}
|
||||
consumerChannels := []string{c}
|
||||
consumerSubName := funcutil.RandomString(8)
|
||||
msgPack := msgstream.MsgPack{}
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kTimeTick, 1, 1))
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kTimeTick, 3, 3))
|
||||
|
@ -339,9 +343,10 @@ func TestStream_PulsarMsgStream_TimeTick(t *testing.T) {
|
|||
|
||||
func TestStream_PulsarMsgStream_BroadCast(t *testing.T) {
|
||||
pulsarAddress, _ := Params.Load("_PulsarAddress")
|
||||
producerChannels := []string{"insert1", "insert2"}
|
||||
consumerChannels := []string{"insert1", "insert2"}
|
||||
consumerSubName := "subInsert"
|
||||
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
|
||||
producerChannels := []string{c1, c2}
|
||||
consumerChannels := []string{c1, c2}
|
||||
consumerSubName := funcutil.RandomString(8)
|
||||
|
||||
msgPack := msgstream.MsgPack{}
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kTimeTick, 1, 1))
|
||||
|
@ -359,9 +364,10 @@ func TestStream_PulsarMsgStream_BroadCast(t *testing.T) {
|
|||
|
||||
func TestStream_PulsarMsgStream_RepackFunc(t *testing.T) {
|
||||
pulsarAddress, _ := Params.Load("_PulsarAddress")
|
||||
producerChannels := []string{"insert1", "insert2"}
|
||||
consumerChannels := []string{"insert1", "insert2"}
|
||||
consumerSubName := "subInsert"
|
||||
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
|
||||
producerChannels := []string{c1, c2}
|
||||
consumerChannels := []string{c1, c2}
|
||||
consumerSubName := funcutil.RandomString(8)
|
||||
|
||||
msgPack := msgstream.MsgPack{}
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kInsert, 1, 1))
|
||||
|
@ -379,10 +385,10 @@ func TestStream_PulsarMsgStream_RepackFunc(t *testing.T) {
|
|||
|
||||
func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) {
|
||||
pulsarAddress, _ := Params.Load("_PulsarAddress")
|
||||
producerChannels := []string{"insert1", "insert2"}
|
||||
consumerChannels := []string{"insert1", "insert2"}
|
||||
consumerSubName := "subInsert"
|
||||
|
||||
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
|
||||
producerChannels := []string{c1, c2}
|
||||
consumerChannels := []string{c1, c2}
|
||||
consumerSubName := funcutil.RandomString(8)
|
||||
baseMsg := msgstream.BaseMsg{
|
||||
BeginTimestamp: 0,
|
||||
EndTimestamp: 0,
|
||||
|
@ -433,9 +439,10 @@ func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) {
|
|||
|
||||
func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) {
|
||||
pulsarAddress, _ := Params.Load("_PulsarAddress")
|
||||
producerChannels := []string{"insert1", "insert2"}
|
||||
consumerChannels := []string{"insert1", "insert2"}
|
||||
consumerSubName := "subInsert"
|
||||
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
|
||||
producerChannels := []string{c1, c2}
|
||||
consumerChannels := []string{c1, c2}
|
||||
consumerSubName := funcutil.RandomString(8)
|
||||
|
||||
baseMsg := msgstream.BaseMsg{
|
||||
BeginTimestamp: 0,
|
||||
|
@ -484,9 +491,10 @@ func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) {
|
|||
|
||||
func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) {
|
||||
pulsarAddress, _ := Params.Load("_PulsarAddress")
|
||||
producerChannels := []string{"insert1", "insert2"}
|
||||
consumerChannels := []string{"insert1", "insert2"}
|
||||
consumerSubName := "subInsert"
|
||||
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
|
||||
producerChannels := []string{c1, c2}
|
||||
consumerChannels := []string{c1, c2}
|
||||
consumerSubName := funcutil.RandomString(8)
|
||||
|
||||
msgPack := msgstream.MsgPack{}
|
||||
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kTimeTick, 1, 1))
|
||||
|
@ -515,10 +523,10 @@ func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) {
|
|||
|
||||
func TestStream_PulsarTtMsgStream_Insert(t *testing.T) {
|
||||
pulsarAddress, _ := Params.Load("_PulsarAddress")
|
||||
producerChannels := []string{"insert1", "insert2"}
|
||||
consumerChannels := []string{"insert1", "insert2"}
|
||||
consumerSubName := "subInsert"
|
||||
|
||||
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
|
||||
producerChannels := []string{c1, c2}
|
||||
consumerChannels := []string{c1, c2}
|
||||
consumerSubName := funcutil.RandomString(8)
|
||||
msgPack0 := msgstream.MsgPack{}
|
||||
msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0, 0, 0))
|
||||
|
||||
|
@ -549,9 +557,10 @@ func TestStream_PulsarTtMsgStream_Insert(t *testing.T) {
|
|||
|
||||
func TestStream_PulsarTtMsgStream_Seek(t *testing.T) {
|
||||
pulsarAddress, _ := Params.Load("_PulsarAddress")
|
||||
producerChannels := []string{"seek_insert1", "seek_insert2"}
|
||||
consumerChannels := []string{"seek_insert1", "seek_insert2"}
|
||||
consumerSubName := "subInsert"
|
||||
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
|
||||
producerChannels := []string{c1, c2}
|
||||
consumerChannels := []string{c1, c2}
|
||||
consumerSubName := funcutil.RandomString(8)
|
||||
|
||||
msgPack0 := MsgPack{}
|
||||
msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0, 0, 0))
|
||||
|
@ -602,9 +611,10 @@ func TestStream_PulsarTtMsgStream_Seek(t *testing.T) {
|
|||
|
||||
func TestStream_PulsarTtMsgStream_UnMarshalHeader(t *testing.T) {
|
||||
pulsarAddress, _ := Params.Load("_PulsarAddress")
|
||||
producerChannels := []string{"insert1", "insert2"}
|
||||
consumerChannels := []string{"insert1", "insert2"}
|
||||
consumerSubName := "subInsert"
|
||||
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
|
||||
producerChannels := []string{c1, c2}
|
||||
consumerChannels := []string{c1, c2}
|
||||
consumerSubName := funcutil.RandomString(8)
|
||||
|
||||
msgPack0 := msgstream.MsgPack{}
|
||||
msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0, 0, 0))
|
||||
|
|
|
@ -177,7 +177,6 @@ func (pt *ParamTable) initQueryNodeIDList() []UniqueID {
|
|||
}
|
||||
ret = append(ret, UniqueID(v))
|
||||
}
|
||||
pt.QueryNodeIDList = ret
|
||||
return ret
|
||||
}
|
||||
|
||||
|
|
|
@ -114,12 +114,7 @@ func (s *loadService) loadSegmentInternal(collectionID UniqueID, partitionID Uni
|
|||
return err
|
||||
}
|
||||
|
||||
//fmt.Println("srcFieldIDs in internal:", srcFieldIDs)
|
||||
//fmt.Println("dstFieldIDs in internal:", fieldIDs)
|
||||
targetFields, err := s.segLoader.checkTargetFields(paths, srcFieldIDs, fieldIDs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
targetFields := s.segLoader.getTargetFields(paths, srcFieldIDs, fieldIDs)
|
||||
err = s.segLoader.loadSegmentFieldsData(segment, targetFields)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -1142,7 +1142,7 @@ func TestSegmentLoad_Search_Vector(t *testing.T) {
|
|||
paths, srcFieldIDs, err := generateInsertBinLog(collectionID, partitionID, segmentID, keyPrefix)
|
||||
assert.NoError(t, err)
|
||||
|
||||
fieldsMap, _ := node.loadService.segLoader.checkTargetFields(paths, srcFieldIDs, fieldIDs)
|
||||
fieldsMap := node.loadService.segLoader.getTargetFields(paths, srcFieldIDs, fieldIDs)
|
||||
assert.Equal(t, len(fieldsMap), 2)
|
||||
|
||||
segment, err := node.replica.getSegmentByID(segmentID)
|
||||
|
|
|
@ -8,7 +8,6 @@ import (
|
|||
"github.com/zilliztech/milvus-distributed/internal/kv"
|
||||
minioKV "github.com/zilliztech/milvus-distributed/internal/kv/minio"
|
||||
"github.com/zilliztech/milvus-distributed/internal/msgstream"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
|
||||
"github.com/zilliztech/milvus-distributed/internal/storage"
|
||||
|
@ -54,11 +53,11 @@ func (loader *segmentLoader) getInsertBinlogPaths(segmentID UniqueID) ([]*intern
|
|||
}
|
||||
|
||||
pathResponse, err := loader.dataClient.GetInsertBinlogPaths(insertBinlogPathRequest)
|
||||
if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if len(pathResponse.FieldIDs) != len(pathResponse.Paths) || len(pathResponse.FieldIDs) <= 0 {
|
||||
if len(pathResponse.FieldIDs) != len(pathResponse.Paths) {
|
||||
return nil, nil, errors.New("illegal InsertBinlogPathsResponse")
|
||||
}
|
||||
|
||||
|
@ -83,7 +82,7 @@ func (loader *segmentLoader) filterOutVectorFields(fieldIDs []int64, vectorField
|
|||
return targetFields
|
||||
}
|
||||
|
||||
func (loader *segmentLoader) checkTargetFields(paths []*internalpb2.StringList, srcFieldIDs []int64, dstFieldIDs []int64) (map[int64]*internalpb2.StringList, error) {
|
||||
func (loader *segmentLoader) getTargetFields(paths []*internalpb2.StringList, srcFieldIDS []int64, dstFields []int64) map[int64]*internalpb2.StringList {
|
||||
targetFields := make(map[int64]*internalpb2.StringList)
|
||||
|
||||
containsFunc := func(s []int64, e int64) bool {
|
||||
|
@ -95,14 +94,13 @@ func (loader *segmentLoader) checkTargetFields(paths []*internalpb2.StringList,
|
|||
return false
|
||||
}
|
||||
|
||||
for i, fieldID := range dstFieldIDs {
|
||||
if !containsFunc(srcFieldIDs, fieldID) {
|
||||
return nil, errors.New("uncompleted fields")
|
||||
for i, fieldID := range srcFieldIDS {
|
||||
if containsFunc(dstFields, fieldID) {
|
||||
targetFields[fieldID] = paths[i]
|
||||
}
|
||||
targetFields[fieldID] = paths[i]
|
||||
}
|
||||
|
||||
return targetFields, nil
|
||||
return targetFields
|
||||
}
|
||||
|
||||
func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, targetFields map[int64]*internalpb2.StringList) error {
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
package funcutil
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"time"
|
||||
)
|
||||
|
||||
func init() {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
}
|
||||
|
||||
var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
|
||||
|
||||
func RandomString(n int) string {
|
||||
b := make([]rune, n)
|
||||
for i := range b {
|
||||
b[i] = letterRunes[rand.Intn(len(letterRunes))]
|
||||
}
|
||||
return string(b)
|
||||
}
|
Loading…
Reference in New Issue