mirror of https://github.com/milvus-io/milvus.git
Split Etcd save operation into small txns (#17386)
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>pull/17440/head
parent
70f8bea4b4
commit
6426e3d037
|
@ -32,7 +32,8 @@ import (
|
|||
const (
|
||||
bufferID = math.MinInt64
|
||||
delimiter = "/"
|
||||
maxOperationsPerTxn = 128
|
||||
maxOperationsPerTxn = 64
|
||||
maxBytesPerTxn = 1024 * 1024
|
||||
)
|
||||
|
||||
var errUnknownOpType = errors.New("unknown operation type")
|
||||
|
|
|
@ -250,6 +250,13 @@ func (m *meta) UpdateFlushSegmentsInfo(
|
|||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
log.Info("update flush segments info", zap.Int64("segmentId", segmentID),
|
||||
zap.Int("binlog", len(binlogs)),
|
||||
zap.Int("statslog", len(statslogs)),
|
||||
zap.Int("deltalogs", len(deltalogs)),
|
||||
zap.Bool("flushed", flushed),
|
||||
zap.Bool("dropped", dropped),
|
||||
zap.Bool("importing", importing))
|
||||
segment := m.segments.GetSegment(segmentID)
|
||||
if importing {
|
||||
m.segments.SetRowCount(segmentID, segment.currRows)
|
||||
|
@ -273,7 +280,7 @@ func (m *meta) UpdateFlushSegmentsInfo(
|
|||
clonedSegment.DroppedAt = uint64(time.Now().UnixNano())
|
||||
modSegments[segmentID] = clonedSegment
|
||||
}
|
||||
|
||||
// TODO add diff encoding and compression
|
||||
currBinlogs := clonedSegment.GetBinlogs()
|
||||
|
||||
var getFieldBinlogs = func(id UniqueID, binlogs []*datapb.FieldBinlog) *datapb.FieldBinlog {
|
||||
|
@ -390,6 +397,7 @@ func (m *meta) UpdateFlushSegmentsInfo(
|
|||
for id, s := range modSegments {
|
||||
m.segments.SetSegment(id, s)
|
||||
}
|
||||
log.Info("update flush segments info successfully", zap.Int64("segmentId", segmentID))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -502,7 +510,7 @@ func (m *meta) mergeDropSegment(seg2Drop *SegmentInfo) *SegmentInfo {
|
|||
// ** the last batch must contains at least one segment
|
||||
// 1. when failure occurs between batches, failover mechanism will continue with the earlist checkpoint of this channel
|
||||
// since the flag is not marked so DataNode can re-consume the drop collection msg
|
||||
// 2. when failure occurs between save meta and unwatch channel, the removal flag shall be check before let datanode watch this channel
|
||||
// 2. when failure occurs between save meta and unwatch channel, the removal flag shall be check before let datanode watch this channel
|
||||
func (m *meta) batchSaveDropSegments(channel string, modSegments map[int64]*SegmentInfo) error {
|
||||
|
||||
// the limitation of etcd operations number per transaction is 128, since segment number might be enormous so we shall split
|
||||
|
@ -510,23 +518,21 @@ func (m *meta) batchSaveDropSegments(channel string, modSegments map[int64]*Segm
|
|||
|
||||
// since the removal flag shall always be with the last batch, so the last batch shall be maxOperationNumber - 1
|
||||
for len(modSegments) > maxOperationsPerTxn-1 {
|
||||
err := m.saveDropSegmentAndRemove(channel, modSegments, false, func(kv map[string]string, modSegments map[int64]*SegmentInfo) bool {
|
||||
// batch filled or only one segment left
|
||||
// since the last batch must contains at least on segment
|
||||
return len(kv) == maxOperationsPerTxn || len(modSegments) == 1
|
||||
})
|
||||
err := m.saveDropSegmentAndRemove(channel, modSegments, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// removal flag should be saved with last batch
|
||||
return m.saveDropSegmentAndRemove(channel, modSegments, true, func(_ map[string]string, _ map[int64]*SegmentInfo) bool { return false })
|
||||
return m.saveDropSegmentAndRemove(channel, modSegments, true)
|
||||
}
|
||||
|
||||
func (m *meta) saveDropSegmentAndRemove(channel string, modSegments map[int64]*SegmentInfo, withFlag bool, stopper func(kv map[string]string, modSegment map[int64]*SegmentInfo) bool) error {
|
||||
func (m *meta) saveDropSegmentAndRemove(channel string, modSegments map[int64]*SegmentInfo, withFlag bool) error {
|
||||
kv := make(map[string]string)
|
||||
update := make([]*SegmentInfo, 0, maxOperationsPerTxn)
|
||||
|
||||
size := 0
|
||||
for id, s := range modSegments {
|
||||
key := buildSegmentPath(s.GetCollectionID(), s.GetPartitionID(), s.GetID())
|
||||
delete(modSegments, id)
|
||||
|
@ -536,7 +542,8 @@ func (m *meta) saveDropSegmentAndRemove(channel string, modSegments map[int64]*S
|
|||
}
|
||||
kv[key] = string(segBytes)
|
||||
update = append(update, s)
|
||||
if stopper(kv, modSegments) {
|
||||
size += len(key) + len(segBytes)
|
||||
if len(kv) == maxOperationsPerTxn || len(modSegments) == 1 || size >= maxBytesPerTxn {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue