mirror of https://github.com/milvus-io/milvus.git
Fix too many operations in txn request when save segmentInfo (#10909)
Signed-off-by: xige-16 <xi.ge@zilliz.com>pull/10922/head
parent
bb96a8ffe3
commit
245874a970
|
@ -455,7 +455,8 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
|
|||
globalSeekPositionTmp = queryChannelInfo.SeekPosition
|
||||
}
|
||||
|
||||
saveKvs := make(map[string]string)
|
||||
// save segmentInfo to etcd
|
||||
segmentInfoKvs := make(map[string]string)
|
||||
for _, infos := range saves {
|
||||
for _, info := range infos {
|
||||
segmentInfoBytes, err := proto.Marshal(info)
|
||||
|
@ -463,10 +464,18 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
|
|||
return col2SegmentChangeInfos, err
|
||||
}
|
||||
segmentKey := fmt.Sprintf("%s/%d", segmentMetaPrefix, info.SegmentID)
|
||||
saveKvs[segmentKey] = string(segmentInfoBytes)
|
||||
segmentInfoKvs[segmentKey] = string(segmentInfoBytes)
|
||||
}
|
||||
}
|
||||
for key, value := range segmentInfoKvs {
|
||||
err := m.client.Save(key, value)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
// save queryChannelInfo and sealedSegmentsChangeInfo to etcd
|
||||
saveKvs := make(map[string]string)
|
||||
for collectionID, queryChannelInfo := range queryChannelInfosMap {
|
||||
channelInfoBytes, err := proto.Marshal(queryChannelInfo)
|
||||
if err != nil {
|
||||
|
@ -496,9 +505,9 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
|
|||
|
||||
err = m.client.MultiSave(saveKvs)
|
||||
if err != nil {
|
||||
log.Error("updateGlobalSealedSegmentInfos: save info to etcd error", zap.Error(err))
|
||||
return col2SegmentChangeInfos, err
|
||||
panic(err)
|
||||
}
|
||||
|
||||
m.segmentMu.Lock()
|
||||
for _, segmentInfos := range saves {
|
||||
for _, info := range segmentInfos {
|
||||
|
@ -576,6 +585,15 @@ func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitio
|
|||
}
|
||||
queryChannelInfo.GlobalSealedSegments = globalSealedSegmentInfos
|
||||
|
||||
// remove meta from etcd
|
||||
for _, info := range removes {
|
||||
segmentKey := fmt.Sprintf("%s/%d", segmentMetaPrefix, info.SegmentID)
|
||||
err = m.client.Remove(segmentKey)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
// save meta to etcd
|
||||
saveKvs := make(map[string]string)
|
||||
channelInfoBytes, err := proto.Marshal(queryChannelInfo)
|
||||
|
@ -601,17 +619,11 @@ func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitio
|
|||
changeInfoKey := fmt.Sprintf("%s/%d", sealedSegmentChangeInfoPrefix, segmentChangeInfos.Base.MsgID)
|
||||
saveKvs[changeInfoKey] = string(changeInfoBytes)
|
||||
|
||||
removeKeys := make([]string, 0)
|
||||
for _, info := range removes {
|
||||
segmentKey := fmt.Sprintf("%s/%d", segmentMetaPrefix, info.SegmentID)
|
||||
removeKeys = append(removeKeys, segmentKey)
|
||||
err = m.client.MultiSave(saveKvs)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
err = m.client.MultiSaveAndRemove(saveKvs, removeKeys)
|
||||
if err != nil {
|
||||
log.Error("updateGlobalSealedSegmentInfos: save info to etcd error", zap.Error(err))
|
||||
return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, err
|
||||
}
|
||||
m.segmentMu.Lock()
|
||||
for _, info := range removes {
|
||||
delete(m.segmentInfos, info.SegmentID)
|
||||
|
|
|
@ -730,8 +730,9 @@ func Test_reverseSealedSegmentChangeInfo(t *testing.T) {
|
|||
}
|
||||
queryCoord.meta.setKvClient(kv)
|
||||
|
||||
err = updateSegmentInfoFromTask(ctx, parentTask, queryCoord.meta)
|
||||
assert.NotNil(t, err)
|
||||
assert.Panics(t, func() {
|
||||
updateSegmentInfoFromTask(ctx, parentTask, queryCoord.meta)
|
||||
})
|
||||
|
||||
queryCoord.Stop()
|
||||
err = removeAllSession()
|
||||
|
|
Loading…
Reference in New Issue