Add some log and improve TestSessionProcessActiveStandBy test case (#27403)

Signed-off-by: SimFG <bang.fu@zilliz.com>
pull/27291/head
SimFG 2023-09-28 09:35:27 +08:00 committed by GitHub
parent 505e8509b7
commit c9653b1683
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 20 additions and 3 deletions

View File

@ -504,7 +504,11 @@ func (s *SegmentManager) GetFlushableSegments(ctx context.Context, channel strin
ret := make([]UniqueID, 0, len(s.segments))
for _, id := range s.segments {
info := s.meta.GetHealthySegment(id)
if info == nil || info.InsertChannel != channel {
if info == nil {
continue
}
if info.InsertChannel != channel {
log.Warn("the channel of flushable segments isn't equal", zap.String("insert_channel", info.InsertChannel), zap.String("channel", channel), zap.Int64("segment", id))
continue
}
if s.flushPolicy(info, t) {

View File

@ -659,8 +659,19 @@ func TestSessionProcessActiveStandBy(t *testing.T) {
log.Debug("Stop session 1, session 2 will take over primary service")
assert.False(t, flag)
s1.Stop()
<-signal
s1.safeCloseLiveCh()
{
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
_, _ = s1.etcdCli.Revoke(ctx, *s1.leaseID)
}
select {
case <-signal:
log.Debug("receive s1 signal")
case <-time.After(10 * time.Second):
log.Debug("wait to fail Liveness Check timeout")
t.FailNow()
}
assert.True(t, flag)
wg.Wait()

View File

@ -869,6 +869,8 @@ func (ms *MqTtMsgStream) Seek(ctx context.Context, msgPositions []*msgpb.MsgPosi
MsgID: msg.ID().Serialize(),
})
ms.chanMsgBuf[consumer] = append(ms.chanMsgBuf[consumer], tsMsg)
} else {
log.Info("skip msg", zap.Any("msg", tsMsg))
}
}
}