mirror of https://github.com/milvus-io/milvus.git
Fix watch channel with nil seek position (#11650)
issue: #11649 Signed-off-by: sunby <bingyi.sun@zilliz.com> Co-authored-by: sunby <bingyi.sun@zilliz.com>pull/11654/head
parent
2ad1ef2618
commit
7a3873960b
|
@ -804,16 +804,9 @@ func (s *Server) GetVChanPositions(channel string, collectionID UniqueID, seekFr
|
|||
}
|
||||
// use collection start position when segment position is not found
|
||||
if seekPosition == nil {
|
||||
coll := s.meta.GetCollection(collectionID)
|
||||
if coll != nil {
|
||||
for _, sp := range coll.GetStartPositions() {
|
||||
if sp.GetKey() == rootcoord.ToPhysicalChannel(channel) {
|
||||
seekPosition = &internalpb.MsgPosition{
|
||||
ChannelName: channel,
|
||||
MsgID: sp.GetData(),
|
||||
}
|
||||
}
|
||||
}
|
||||
collection := s.GetCollection(s.ctx, collectionID)
|
||||
if collection != nil {
|
||||
seekPosition = getCollectionStartPosition(channel, collection)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -826,6 +819,19 @@ func (s *Server) GetVChanPositions(channel string, collectionID UniqueID, seekFr
|
|||
}
|
||||
}
|
||||
|
||||
func getCollectionStartPosition(channel string, collectionInfo *datapb.CollectionInfo) *internalpb.MsgPosition {
|
||||
for _, sp := range collectionInfo.GetStartPositions() {
|
||||
if sp.GetKey() != rootcoord.ToPhysicalChannel(channel) {
|
||||
continue
|
||||
}
|
||||
return &internalpb.MsgPosition{
|
||||
ChannelName: channel,
|
||||
MsgID: sp.GetData(),
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// trimSegmentInfo returns a shallow copy of datapb.SegmentInfo and sets ALL binlog info to nil
|
||||
func trimSegmentInfo(info *datapb.SegmentInfo) *datapb.SegmentInfo {
|
||||
return &datapb.SegmentInfo{
|
||||
|
@ -841,3 +847,16 @@ func trimSegmentInfo(info *datapb.SegmentInfo) *datapb.SegmentInfo {
|
|||
DmlPosition: info.DmlPosition,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) GetCollection(ctx context.Context, collectionID UniqueID) *datapb.CollectionInfo {
|
||||
coll := s.meta.GetCollection(collectionID)
|
||||
if coll != nil {
|
||||
return coll
|
||||
}
|
||||
err := s.loadCollectionFromRootCoord(ctx, collectionID)
|
||||
if err != nil {
|
||||
log.Warn("failed to load collection from RootCoord", zap.Int64("collectionID", collectionID), zap.Error(err))
|
||||
}
|
||||
|
||||
return s.meta.GetCollection(collectionID)
|
||||
}
|
||||
|
|
|
@ -114,13 +114,8 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
|
|||
zap.String("channelName", r.GetChannelName()),
|
||||
zap.Uint32("count", r.GetCount()))
|
||||
|
||||
if coll := s.meta.GetCollection(r.CollectionID); coll == nil {
|
||||
if err := s.loadCollectionFromRootCoord(ctx, r.CollectionID); err != nil {
|
||||
log.Error("load collection from rootcoord error",
|
||||
zap.Int64("collectionID", r.CollectionID),
|
||||
zap.Error(err))
|
||||
continue
|
||||
}
|
||||
if coll := s.GetCollection(ctx, r.CollectionID); coll == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
s.cluster.Watch(r.ChannelName, r.CollectionID)
|
||||
|
|
Loading…
Reference in New Issue