diff --git a/internal/masterservice/task.go b/internal/masterservice/task.go index 70a8a7b646..f144fefe03 100644 --- a/internal/masterservice/task.go +++ b/internal/masterservice/task.go @@ -457,7 +457,13 @@ func (t *ShowPartitionReqTask) IgnoreTimeStamp() bool { } func (t *ShowPartitionReqTask) Execute() error { - coll, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName) + var coll *etcdpb.CollectionInfo + var err error + if t.Req.CollectionName == "" { + coll, err = t.core.MetaTable.GetCollectionByID(t.Req.CollectionID) + } else { + coll, err = t.core.MetaTable.GetCollectionByName(t.Req.CollectionName) + } if err != nil { return err } diff --git a/internal/msgstream/pulsarms/pulsar_msgstream.go b/internal/msgstream/pulsarms/pulsar_msgstream.go index 1aa3229954..f70ce3d914 100644 --- a/internal/msgstream/pulsarms/pulsar_msgstream.go +++ b/internal/msgstream/pulsarms/pulsar_msgstream.go @@ -32,7 +32,6 @@ type QueryNodeStatsMsg = msgstream.QueryNodeStatsMsg type RepackFunc = msgstream.RepackFunc type Consumer = pulsar.Consumer type Producer = pulsar.Producer -type MessageID = pulsar.MessageID type UnmarshalDispatcher = msgstream.UnmarshalDispatcher type PulsarMsgStream struct { @@ -47,8 +46,6 @@ type PulsarMsgStream struct { wait *sync.WaitGroup streamCancel func() pulsarBufSize int64 - consumerLock *sync.Mutex - consumerReflects []reflect.SelectCase } func newPulsarMsgStream(ctx context.Context, @@ -61,30 +58,22 @@ func newPulsarMsgStream(ctx context.Context, producers := make([]Producer, 0) consumers := make([]Consumer, 0) consumerChannels := make([]string, 0) - consumerReflects := make([]reflect.SelectCase, 0) - receiveBuf := make(chan *MsgPack, receiveBufSize) - - client, err := pulsar.NewClient(pulsar.ClientOptions{URL: address}) - if err != nil { - defer streamCancel() - log.Printf("Set pulsar client failed, error = %v", err) - return nil, err - } - stream := &PulsarMsgStream{ ctx: streamCtx, - client: client, + streamCancel: streamCancel, producers: producers, consumers: consumers, consumerChannels: consumerChannels, unmarshal: unmarshal, pulsarBufSize: pulsarBufSize, - receiveBuf: receiveBuf, - streamCancel: streamCancel, - consumerReflects: consumerReflects, - consumerLock: &sync.Mutex{}, } - + client, err := pulsar.NewClient(pulsar.ClientOptions{URL: address}) + if err != nil { + log.Printf("Set pulsar client failed, error = %v", err) + return nil, err + } + stream.client = client + stream.receiveBuf = make(chan *MsgPack, receiveBufSize) return stream, nil } @@ -129,14 +118,7 @@ func (ms *PulsarMsgStream) AsConsumer(channels []string, return errors.New("pulsar is not ready, consumer is nil") } - ms.consumerLock.Lock() ms.consumers = append(ms.consumers, pc) - ms.consumerChannels = append(ms.consumerChannels, channels[i]) - ms.consumerReflects = append(ms.consumerReflects, reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(pc.Chan()), - }) - ms.consumerLock.Unlock() return nil } err := util.Retry(10, time.Millisecond*200, fn) @@ -317,6 +299,12 @@ func (ms *PulsarMsgStream) Consume() *MsgPack { func (ms *PulsarMsgStream) bufMsgPackToChannel() { defer ms.wait.Done() + cases := make([]reflect.SelectCase, len(ms.consumers)) + for i := 0; i < len(ms.consumers); i++ { + ch := ms.consumers[i].Chan() + cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)} + } + for { select { case <-ms.ctx.Done(): @@ -326,9 +314,7 @@ func (ms *PulsarMsgStream) bufMsgPackToChannel() { tsMsgList := make([]TsMsg, 0) for { - ms.consumerLock.Lock() - chosen, value, ok := reflect.Select(ms.consumerReflects) - ms.consumerLock.Unlock() + chosen, value, ok := reflect.Select(cases) if !ok { log.Printf("channel closed") return @@ -353,11 +339,6 @@ func (ms *PulsarMsgStream) bufMsgPackToChannel() { log.Printf("Failed to unmarshal tsMsg, error = %v", err) continue } - - tsMsg.SetPosition(&msgstream.MsgPosition{ - ChannelName: filepath.Base(pulsarMsg.Topic()), - MsgID: typeutil.PulsarMsgIDToString(pulsarMsg.ID()), - }) tsMsgList = append(tsMsgList, tsMsg) noMoreMessage := true @@ -405,7 +386,6 @@ func (ms *PulsarMsgStream) Seek(mp *internalpb2.MsgPosition) error { type PulsarTtMsgStream struct { PulsarMsgStream unsolvedBuf map[Consumer][]TsMsg - unsolvedMutex *sync.Mutex lastTimeStamp Timestamp } @@ -414,53 +394,27 @@ func NewPulsarTtMsgStream(ctx context.Context, receiveBufSize int64, pulsarBufSize int64, unmarshal msgstream.UnmarshalDispatcher) (*PulsarTtMsgStream, error) { - pulsarMsgStream, err := newPulsarMsgStream(ctx, address, receiveBufSize, pulsarBufSize, unmarshal) + + streamCtx, streamCancel := context.WithCancel(ctx) + pulsarMsgStream := PulsarMsgStream{ + ctx: streamCtx, + streamCancel: streamCancel, + pulsarBufSize: pulsarBufSize, + unmarshal: unmarshal, + } + + client, err := pulsar.NewClient(pulsar.ClientOptions{URL: address}) if err != nil { + log.Printf("Set pulsar client failed, error = %v", err) return nil, err } - unsolvedBuf := make(map[Consumer][]TsMsg) - + pulsarMsgStream.client = client + pulsarMsgStream.receiveBuf = make(chan *MsgPack, receiveBufSize) return &PulsarTtMsgStream{ - PulsarMsgStream: *pulsarMsgStream, - unsolvedBuf: unsolvedBuf, - unsolvedMutex: &sync.Mutex{}, + PulsarMsgStream: pulsarMsgStream, }, nil } -func (ms *PulsarTtMsgStream) AsConsumer(channels []string, - subName string) { - for i := 0; i < len(channels); i++ { - fn := func() error { - receiveChannel := make(chan pulsar.ConsumerMessage, ms.pulsarBufSize) - pc, err := ms.client.Subscribe(pulsar.ConsumerOptions{ - Topic: channels[i], - SubscriptionName: subName, - Type: pulsar.KeyShared, - SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest, - MessageChannel: receiveChannel, - }) - if err != nil { - return err - } - if pc == nil { - return errors.New("pulsar is not ready, consumer is nil") - } - - ms.consumerLock.Lock() - ms.consumers = append(ms.consumers, pc) - ms.unsolvedBuf[pc] = make([]TsMsg, 0) - ms.consumerChannels = append(ms.consumerChannels, channels[i]) - ms.consumerLock.Unlock() - return nil - } - err := util.Retry(10, time.Millisecond*200, fn) - if err != nil { - errMsg := "Failed to create consumer " + channels[i] + ", error = " + err.Error() - panic(errMsg) - } - } -} - func (ms *PulsarTtMsgStream) Start() { ms.wait = &sync.WaitGroup{} if ms.consumers != nil { @@ -474,32 +428,33 @@ func (ms *PulsarTtMsgStream) bufMsgPackToChannel() { ms.unsolvedBuf = make(map[Consumer][]TsMsg) isChannelReady := make(map[Consumer]bool) eofMsgTimeStamp := make(map[Consumer]Timestamp) - + for _, consumer := range ms.consumers { + ms.unsolvedBuf[consumer] = make([]TsMsg, 0) + } for { select { case <-ms.ctx.Done(): return default: wg := sync.WaitGroup{} + mu := sync.Mutex{} findMapMutex := sync.RWMutex{} - ms.consumerLock.Lock() for _, consumer := range ms.consumers { if isChannelReady[consumer] { continue } wg.Add(1) - go ms.findTimeTick(consumer, eofMsgTimeStamp, &wg, &findMapMutex) + go ms.findTimeTick(consumer, eofMsgTimeStamp, &wg, &mu, &findMapMutex) } wg.Wait() timeStamp, ok := checkTimeTickMsg(eofMsgTimeStamp, isChannelReady, &findMapMutex) - ms.consumerLock.Unlock() if !ok || timeStamp <= ms.lastTimeStamp { //log.Printf("All timeTick's timestamps are inconsistent") continue } + timeTickBuf := make([]TsMsg, 0) msgPositions := make([]*internalpb2.MsgPosition, 0) - ms.unsolvedMutex.Lock() for consumer, msgs := range ms.unsolvedBuf { tempBuffer := make([]TsMsg, 0) var timeTickMsg TsMsg @@ -530,7 +485,6 @@ func (ms *PulsarTtMsgStream) bufMsgPackToChannel() { }) } } - ms.unsolvedMutex.Unlock() msgPack := MsgPack{ BeginTs: ms.lastTimeStamp, @@ -548,6 +502,7 @@ func (ms *PulsarTtMsgStream) bufMsgPackToChannel() { func (ms *PulsarTtMsgStream) findTimeTick(consumer Consumer, eofMsgMap map[Consumer]Timestamp, wg *sync.WaitGroup, + mu *sync.Mutex, findMapMutex *sync.RWMutex) { defer wg.Done() for { @@ -564,13 +519,14 @@ func (ms *PulsarTtMsgStream) findTimeTick(consumer Consumer, headerMsg := commonpb.MsgHeader{} err := proto.Unmarshal(pulsarMsg.Payload(), &headerMsg) if err != nil { - log.Printf("Failed to unmarshal message header, error = %v", err) - continue + log.Printf("Failed to unmarshal, error = %v", err) } tsMsg, err := ms.unmarshal.Unmarshal(pulsarMsg.Payload(), headerMsg.Base.MsgType) + if tsMsg == nil && err != nil { + panic("null unMarshalFunc for " + headerMsg.Base.MsgType.String() + " msg type") + } if err != nil { - log.Printf("Failed to unmarshal tsMsg, error = %v", err) - continue + log.Printf("Failed to unmarshal, error = %v", err) } // set pulsar info to tsMsg tsMsg.SetPosition(&msgstream.MsgPosition{ @@ -578,9 +534,9 @@ func (ms *PulsarTtMsgStream) findTimeTick(consumer Consumer, MsgID: typeutil.PulsarMsgIDToString(pulsarMsg.ID()), }) - ms.unsolvedMutex.Lock() + mu.Lock() ms.unsolvedBuf[consumer] = append(ms.unsolvedBuf[consumer], tsMsg) - ms.unsolvedMutex.Unlock() + mu.Unlock() if headerMsg.Base.MsgType == commonpb.MsgType_kTimeTick { findMapMutex.Lock() @@ -593,60 +549,50 @@ func (ms *PulsarTtMsgStream) findTimeTick(consumer Consumer, } func (ms *PulsarTtMsgStream) Seek(mp *internalpb2.MsgPosition) error { - var consumer Consumer - var messageID MessageID for index, channel := range ms.consumerChannels { if filepath.Base(channel) == filepath.Base(mp.ChannelName) { - seekMsgID, err := typeutil.StringToPulsarMsgID(mp.MsgID) + messageID, err := typeutil.StringToPulsarMsgID(mp.MsgID) + if err != nil { + return err + } + consumer := ms.consumers[index] + err = (consumer).Seek(messageID) if err != nil { return err } - consumer = ms.consumers[index] - messageID = seekMsgID - break - } - } - if consumer != nil { - err := (consumer).Seek(messageID) - if err != nil { - return err - } - - ms.unsolvedMutex.Lock() - ms.unsolvedBuf[consumer] = make([]TsMsg, 0) - for { - select { - case <-ms.ctx.Done(): - return nil - case pulsarMsg, ok := <-consumer.Chan(): - if !ok { - return errors.New("consumer closed") - } - consumer.Ack(pulsarMsg) - - headerMsg := commonpb.MsgHeader{} - err := proto.Unmarshal(pulsarMsg.Payload(), &headerMsg) - if err != nil { - log.Printf("Failed to unmarshal message header, error = %v", err) - } - tsMsg, err := ms.unmarshal.Unmarshal(pulsarMsg.Payload(), headerMsg.Base.MsgType) - if err != nil { - log.Printf("Failed to unmarshal tsMsg, error = %v", err) - } - if tsMsg.Type() == commonpb.MsgType_kTimeTick { - if tsMsg.BeginTs() >= mp.Timestamp { - ms.unsolvedMutex.Unlock() - return nil + for { + select { + case <-ms.ctx.Done(): + return nil + case pulsarMsg, ok := <-consumer.Chan(): + if !ok { + return errors.New("consumer closed") + } + consumer.Ack(pulsarMsg) + + headerMsg := commonpb.MsgHeader{} + err := proto.Unmarshal(pulsarMsg.Payload(), &headerMsg) + if err != nil { + log.Printf("Failed to unmarshal msgHeader, error = %v", err) + } + tsMsg, err := ms.unmarshal.Unmarshal(pulsarMsg.Payload(), headerMsg.Base.MsgType) + if tsMsg == nil && err != nil { + panic("null unMarshalFunc for " + headerMsg.Base.MsgType.String() + " msg type") + + } + if err != nil { + log.Printf("Failed to unmarshal pulsarMsg, error = %v", err) + } + if tsMsg.Type() == commonpb.MsgType_kTimeTick { + if tsMsg.BeginTs() >= mp.Timestamp { + return nil + } + continue + } + if tsMsg.BeginTs() > mp.Timestamp { + ms.unsolvedBuf[consumer] = append(ms.unsolvedBuf[consumer], tsMsg) } - continue - } - if tsMsg.BeginTs() > mp.Timestamp { - tsMsg.SetPosition(&msgstream.MsgPosition{ - ChannelName: filepath.Base(pulsarMsg.Topic()), - MsgID: typeutil.PulsarMsgIDToString(pulsarMsg.ID()), - }) - ms.unsolvedBuf[consumer] = append(ms.unsolvedBuf[consumer], tsMsg) } } } diff --git a/internal/msgstream/pulsarms/pulsar_msgstream_test.go b/internal/msgstream/pulsarms/pulsar_msgstream_test.go index b406cbb742..2fcfad927e 100644 --- a/internal/msgstream/pulsarms/pulsar_msgstream_test.go +++ b/internal/msgstream/pulsarms/pulsar_msgstream_test.go @@ -568,7 +568,7 @@ func TestStream_PulsarTtMsgStream_Seek(t *testing.T) { msgPack3.Msgs = append(msgPack3.Msgs, getTsMsg(commonpb.MsgType_kInsert, 9, 9)) msgPack4 := MsgPack{} - msgPack4.Msgs = append(msgPack4.Msgs, getTimeTickMsg(11, 11, 11)) + msgPack4.Msgs = append(msgPack2.Msgs, getTimeTickMsg(11, 11, 11)) msgPack5 := MsgPack{} msgPack5.Msgs = append(msgPack5.Msgs, getTimeTickMsg(15, 15, 15)) diff --git a/tests/python/requirements.txt b/tests/python/requirements.txt index c7be00ee04..fcf08cf5d0 100644 --- a/tests/python/requirements.txt +++ b/tests/python/requirements.txt @@ -2,7 +2,7 @@ grpcio==1.26.0 grpcio-tools==1.26.0 numpy==1.18.1 pytest-cov==2.8.1 -pymilvus-distributed==0.0.20 +pymilvus-distributed==0.0.19 sklearn==0.0 pytest==4.5.0 pytest-timeout==1.3.3